Skip to content

py-3rdparty-mediawiki API Documentation

crypt

Created on 25.03.2020

@author: wf

Crypt

Bases: object

Python implementation of PBEWithMD5AndDES see https://github.com/binsgit/PBEWithMD5AndDES and https://gist.github.com/rohitshampur/da5f79c34260150aafc1

converted to class

Source code in wikibot3rd/crypt.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
class Crypt(object):
    """
    Python implementation of PBEWithMD5AndDES
    see
    https://github.com/binsgit/PBEWithMD5AndDES
    and
    https://gist.github.com/rohitshampur/da5f79c34260150aafc1

    converted to class"""

    def __init__(self, cypher, iterations=20, salt=None):
        """construct me with the given cypher iterations and salt"""
        # avoid annoying
        #  /opt/local/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/Crypto/Cipher/blockalgo.py:141: DeprecationWarning: PY_SSIZE_T_CLEAN will be required for '#' formats
        warnings.filterwarnings("ignore", category=DeprecationWarning)

        self.cypher = cypher.encode("utf-8")
        self.iterations = iterations
        if salt is None:
            self.salt = os.urandom(8)
        else:
            self.salt = salt.encode("utf-8")
        pass

    @staticmethod
    def getRandomString(rlen=32):
        # https://docs.python.org/3/library/secrets.html
        alphabet = string.ascii_letters + string.digits
        rstring = "".join(secrets.choice(alphabet) for i in range(rlen))
        return rstring

    @staticmethod
    def getRandomCrypt(cypherLen=32):
        cypher = Crypt.getRandomString(cypherLen)
        salt = Crypt.getRandomString(8)
        crypt = Crypt(cypher, salt=salt)
        return crypt

    def getCrypt(self):
        """
        get my DES crypt
        """
        hasher = MD5.new()
        hasher.update(self.cypher)
        hasher.update(self.salt)
        result = hasher.digest()

        # iterate over hashes
        for _i in range(1, self.iterations):
            hasher = MD5.new()
            hasher.update(result)
            result = hasher.digest()
        key = result[:8]
        # initialization vector
        iv = result[8:16]
        return DES.new(key, DES.MODE_CBC, iv)

    def encrypt(self, msg):
        """
        encrypt the given message
        """
        plaintext_to_encrypt = msg
        # Pad plaintext per RFC 2898 Section 6.1
        padding = 8 - len(plaintext_to_encrypt) % 8
        plaintext_to_encrypt += chr(padding) * padding
        encoder = self.getCrypt()
        encrypted = encoder.encrypt(plaintext_to_encrypt.encode("utf-8"))
        b64enc = base64.b64encode(encrypted).decode("utf-8")
        return b64enc

    def decrypt(self, encoded):
        """
        decrypt the given message
        """
        enc = base64.b64decode(encoded)
        decoder = self.getCrypt()
        decryptedb = decoder.decrypt(enc)
        decrypted = decryptedb.decode("utf-8")
        return decrypted.rstrip("\2,\1,\3,\4,\5,\6,\7,\0,\b")

__init__(cypher, iterations=20, salt=None)

construct me with the given cypher iterations and salt

Source code in wikibot3rd/crypt.py
27
28
29
30
31
32
33
34
35
36
37
38
39
def __init__(self, cypher, iterations=20, salt=None):
    """construct me with the given cypher iterations and salt"""
    # avoid annoying
    #  /opt/local/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/Crypto/Cipher/blockalgo.py:141: DeprecationWarning: PY_SSIZE_T_CLEAN will be required for '#' formats
    warnings.filterwarnings("ignore", category=DeprecationWarning)

    self.cypher = cypher.encode("utf-8")
    self.iterations = iterations
    if salt is None:
        self.salt = os.urandom(8)
    else:
        self.salt = salt.encode("utf-8")
    pass

decrypt(encoded)

decrypt the given message

Source code in wikibot3rd/crypt.py
87
88
89
90
91
92
93
94
95
def decrypt(self, encoded):
    """
    decrypt the given message
    """
    enc = base64.b64decode(encoded)
    decoder = self.getCrypt()
    decryptedb = decoder.decrypt(enc)
    decrypted = decryptedb.decode("utf-8")
    return decrypted.rstrip("\2,\1,\3,\4,\5,\6,\7,\0,\b")

encrypt(msg)

encrypt the given message

Source code in wikibot3rd/crypt.py
74
75
76
77
78
79
80
81
82
83
84
85
def encrypt(self, msg):
    """
    encrypt the given message
    """
    plaintext_to_encrypt = msg
    # Pad plaintext per RFC 2898 Section 6.1
    padding = 8 - len(plaintext_to_encrypt) % 8
    plaintext_to_encrypt += chr(padding) * padding
    encoder = self.getCrypt()
    encrypted = encoder.encrypt(plaintext_to_encrypt.encode("utf-8"))
    b64enc = base64.b64encode(encrypted).decode("utf-8")
    return b64enc

getCrypt()

get my DES crypt

Source code in wikibot3rd/crypt.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def getCrypt(self):
    """
    get my DES crypt
    """
    hasher = MD5.new()
    hasher.update(self.cypher)
    hasher.update(self.salt)
    result = hasher.digest()

    # iterate over hashes
    for _i in range(1, self.iterations):
        hasher = MD5.new()
        hasher.update(result)
        result = hasher.digest()
    key = result[:8]
    # initialization vector
    iv = result[8:16]
    return DES.new(key, DES.MODE_CBC, iv)

lambda_action

Created on 31.01.2021

@author: wf

Code

Bases: object

a piece of code

Source code in wikibot3rd/lambda_action.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
class Code(object):
    """
    a piece of code
    """

    def __init__(self, name: str, text: str, lang: str = "python"):
        """
        construct me from the given text and language
        """
        self.name = name
        self.text = text
        self.lang = lang

    def execute(self, context):
        """
        https://stackoverflow.com/questions/701802/how-do-i-execute-a-string-containing-python-code-in-python
        https://stackoverflow.com/questions/436198/what-is-an-alternative-to-execfile-in-python-3
        https://stackoverflow.com/questions/2220699/whats-the-difference-between-eval-exec-and-compile
        """
        if self.lang == "jinja":
            self.executeTemplate(context)
        else:
            exec(self.text)
        pass

    def executeTemplate(self, context):
        """
        Renders the jinja-template with the query results placed in the given context and stores the result as wiki page.
        The name of the wiki page is either given through the template with set parameters.
        E.g.:
            {% set pagetitle = ""%}
            {% set pagetitle_prefix = "List of "%}
        If the pagetitle is empty, like in the example, the name variable of the query results is used as pagetitle.
        The pagetitle_prefix is added in all cases, if not defined a empty string is added.
        Assumption: self.text is a jinja template
        """
        getAttribute = lambda template, name: re.search(
            "\{% *set +" + name + " += *['\"](?P<name>.*)['\"] *%\}", template
        ).group("name")
        raw_template = LambdaAction.unescapeHTML(self.text)
        template = Template(raw_template)
        title = getAttribute(raw_template, "pagetitle")
        pagetitle_prefix = getAttribute(raw_template, "pagetitle_prefix")
        if not pagetitle_prefix:
            pagetitle_prefix = ""
        user = context["wikiclient"].wikiUser.user
        for row in context["rows"]:
            page_content = template.render({"row": row, "smw": context["smw"]})
            if not title:
                if not row["name"]:
                    raise ValueError(
                        f"Can't save wikipage without a title. Either provide a page title in the "
                        f"template or provide the name variable for each query result"
                    )
                pagetitle = pagetitle_prefix + row["name"]
            else:
                pagetitle = pagetitle_prefix + title
            page = context["wikiclient"].getPage(pagetitle)
            # ToDo
            page.edit(
                page_content,
                f"Created with the template [[{self.name}]] by {user} through LambdaActions",
            )

__init__(name, text, lang='python')

construct me from the given text and language

Source code in wikibot3rd/lambda_action.py
19
20
21
22
23
24
25
def __init__(self, name: str, text: str, lang: str = "python"):
    """
    construct me from the given text and language
    """
    self.name = name
    self.text = text
    self.lang = lang

execute(context)

https://stackoverflow.com/questions/701802/how-do-i-execute-a-string-containing-python-code-in-python https://stackoverflow.com/questions/436198/what-is-an-alternative-to-execfile-in-python-3 https://stackoverflow.com/questions/2220699/whats-the-difference-between-eval-exec-and-compile

Source code in wikibot3rd/lambda_action.py
27
28
29
30
31
32
33
34
35
36
37
def execute(self, context):
    """
    https://stackoverflow.com/questions/701802/how-do-i-execute-a-string-containing-python-code-in-python
    https://stackoverflow.com/questions/436198/what-is-an-alternative-to-execfile-in-python-3
    https://stackoverflow.com/questions/2220699/whats-the-difference-between-eval-exec-and-compile
    """
    if self.lang == "jinja":
        self.executeTemplate(context)
    else:
        exec(self.text)
    pass

executeTemplate(context)

Renders the jinja-template with the query results placed in the given context and stores the result as wiki page. The name of the wiki page is either given through the template with set parameters. E.g.: {% set pagetitle = ""%} {% set pagetitle_prefix = "List of "%} If the pagetitle is empty, like in the example, the name variable of the query results is used as pagetitle. The pagetitle_prefix is added in all cases, if not defined a empty string is added. Assumption: self.text is a jinja template

Source code in wikibot3rd/lambda_action.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def executeTemplate(self, context):
    """
    Renders the jinja-template with the query results placed in the given context and stores the result as wiki page.
    The name of the wiki page is either given through the template with set parameters.
    E.g.:
        {% set pagetitle = ""%}
        {% set pagetitle_prefix = "List of "%}
    If the pagetitle is empty, like in the example, the name variable of the query results is used as pagetitle.
    The pagetitle_prefix is added in all cases, if not defined a empty string is added.
    Assumption: self.text is a jinja template
    """
    getAttribute = lambda template, name: re.search(
        "\{% *set +" + name + " += *['\"](?P<name>.*)['\"] *%\}", template
    ).group("name")
    raw_template = LambdaAction.unescapeHTML(self.text)
    template = Template(raw_template)
    title = getAttribute(raw_template, "pagetitle")
    pagetitle_prefix = getAttribute(raw_template, "pagetitle_prefix")
    if not pagetitle_prefix:
        pagetitle_prefix = ""
    user = context["wikiclient"].wikiUser.user
    for row in context["rows"]:
        page_content = template.render({"row": row, "smw": context["smw"]})
        if not title:
            if not row["name"]:
                raise ValueError(
                    f"Can't save wikipage without a title. Either provide a page title in the "
                    f"template or provide the name variable for each query result"
                )
            pagetitle = pagetitle_prefix + row["name"]
        else:
            pagetitle = pagetitle_prefix + title
        page = context["wikiclient"].getPage(pagetitle)
        # ToDo
        page.edit(
            page_content,
            f"Created with the template [[{self.name}]] by {user} through LambdaActions",
        )

LambdaAction

Bases: object

a lambda action

Source code in wikibot3rd/lambda_action.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
class LambdaAction(object):
    """
    a lambda action
    """

    def __init__(self, name: str, query: Query, code: Code):
        """
        Constructor
        """
        self.name = name
        self.query = query
        self.code = code

    def executeQuery(self, context):
        rows = None
        if self.query.lang == "sql":
            if "sqlDB" in context:
                db = context["sqlDB"]
                rows = db.query(self.query.query)
        elif self.query.lang == "sparql":
            # ToDo
            pass
        elif self.query.lang == "smw":
            if "smw" in context:
                smw = context["smw"]
                query = LambdaAction.unescapeHTML(self.query.query)
                query_results = smw.query(query)
                rows = list(query_results.values())
        else:
            print(
                f"Queries of type {self.query.lang} are currently not supported by LambdaActions."
            )
        context["rows"] = rows
        return rows

    def getMessage(self, context):
        message = None
        if "result" in context:
            result = context["result"]
            if "message" in result:
                message = result["message"]
        return message

    def execute(self, context):
        """
        run my query and feed the result into the given code

        Args:
            context(dict): a dictionary for the exchange of parameters
        """
        self.executeQuery(context)
        self.code.execute(context)

    @staticmethod
    def unescapeHTML(value: str):
        """
        Unescapes received html value and removes html tags.
        Replaces:
            <br /> -> "\n"
            <pre> -> ""
        Args:
            value(str): html encoded string
        Returns:
            Returns the received value but without the html tags and unescaped.
        """
        if value.startswith("<pre>"):
            return html.unescape(value).replace("<br />", "\n")[5:][:-6]
        return value

__init__(name, query, code)

Constructor

Source code in wikibot3rd/lambda_action.py
84
85
86
87
88
89
90
def __init__(self, name: str, query: Query, code: Code):
    """
    Constructor
    """
    self.name = name
    self.query = query
    self.code = code

execute(context)

run my query and feed the result into the given code

Parameters:

Name Type Description Default
context(dict)

a dictionary for the exchange of parameters

required
Source code in wikibot3rd/lambda_action.py
122
123
124
125
126
127
128
129
130
def execute(self, context):
    """
    run my query and feed the result into the given code

    Args:
        context(dict): a dictionary for the exchange of parameters
    """
    self.executeQuery(context)
    self.code.execute(context)

unescapeHTML(value) staticmethod

    Unescapes received html value and removes html tags.
    Replaces:
        <br /> -> "

"

 -> ""
        Args:
            value(str): html encoded string
        Returns:
            Returns the received value but without the html tags and unescaped.

Source code in wikibot3rd/lambda_action.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
@staticmethod
def unescapeHTML(value: str):
    """
    Unescapes received html value and removes html tags.
    Replaces:
        <br /> -> "\n"
        <pre> -> ""
    Args:
        value(str): html encoded string
    Returns:
        Returns the received value but without the html tags and unescaped.
    """
    if value.startswith("<pre>"):
        return html.unescape(value).replace("<br />", "\n")[5:][:-6]
    return value

mwTable

Created on 2020-08-21

@author: wf

MediaWikiTable

Bases: object

helper for https://www.mediawiki.org/wiki/Help:Tables

Source code in wikibot3rd/mwTable.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
class MediaWikiTable(object):
    """
    helper for https://www.mediawiki.org/wiki/Help:Tables
    """

    def __init__(
        self, wikiTable=True, colFormats=None, sortable=True, withNewLines=False
    ):
        """
        Constructor
        """
        self.colFormats = colFormats
        cssDelim = ""
        if wikiTable:
            cWikiTable = "wikitable"
            cssDelim = " "
        else:
            cWikiTable = ""
        if sortable:
            cSortable = "sortable"
        else:
            cSortable = ""

        self.start = '{|class="%s%s%s"\n' % (cWikiTable, cssDelim, cSortable)
        self.header = None
        self.content = ""
        self.end = "\n|}\n"
        self.withNewLines = withNewLines
        pass

    def addHeader(self, record):
        """
        add the given record as a "sample" header
        """
        if self.withNewLines:
            headerStart = "|+"
            firstColDelim = "\n!"
            colDelim = firstColDelim
        else:
            headerStart = "|+\n"
            firstColDelim = "!"
            colDelim = "!!"
        self.header = headerStart
        first = True
        for key in record.keys():
            if first:
                delim = firstColDelim
                first = False
            else:
                delim = colDelim
            self.header += "%s%s" % (delim, key)

    def addRow4Dict(self, record):
        if self.header is None:
            self.addHeader(record)
        if self.withNewLines:
            rowStart = "\n|-"
            colDelim = "\n|"
        else:
            rowStart = "\n|-\n"
            colDelim = "||"
        self.content += rowStart
        for key in record.keys():
            value = record[key]
            if self.colFormats is not None and key in self.colFormats:
                colFormat = self.colFormats[key]
            else:
                colFormat = "%s"
            self.content += ("%s" + colFormat) % (colDelim, value)

    def fromListOfDicts(self, listOfDicts):
        for record in listOfDicts:
            self.addRow4Dict(record)
        pass

    def noneReplace(self, value):
        return "" if value is None else value

    def asWikiMarkup(self):
        """
        convert me to MediaWiki markup

        Returns:
            string: the MediWiki Markup for this table
        """
        markup = (
            self.noneReplace(self.start)
            + self.noneReplace(self.header)
            + self.noneReplace(self.content)
            + self.noneReplace(self.end)
        )
        return markup

__init__(wikiTable=True, colFormats=None, sortable=True, withNewLines=False)

Constructor

Source code in wikibot3rd/mwTable.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def __init__(
    self, wikiTable=True, colFormats=None, sortable=True, withNewLines=False
):
    """
    Constructor
    """
    self.colFormats = colFormats
    cssDelim = ""
    if wikiTable:
        cWikiTable = "wikitable"
        cssDelim = " "
    else:
        cWikiTable = ""
    if sortable:
        cSortable = "sortable"
    else:
        cSortable = ""

    self.start = '{|class="%s%s%s"\n' % (cWikiTable, cssDelim, cSortable)
    self.header = None
    self.content = ""
    self.end = "\n|}\n"
    self.withNewLines = withNewLines
    pass

addHeader(record)

add the given record as a "sample" header

Source code in wikibot3rd/mwTable.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def addHeader(self, record):
    """
    add the given record as a "sample" header
    """
    if self.withNewLines:
        headerStart = "|+"
        firstColDelim = "\n!"
        colDelim = firstColDelim
    else:
        headerStart = "|+\n"
        firstColDelim = "!"
        colDelim = "!!"
    self.header = headerStart
    first = True
    for key in record.keys():
        if first:
            delim = firstColDelim
            first = False
        else:
            delim = colDelim
        self.header += "%s%s" % (delim, key)

asWikiMarkup()

convert me to MediaWiki markup

Returns:

Name Type Description
string

the MediWiki Markup for this table

Source code in wikibot3rd/mwTable.py
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def asWikiMarkup(self):
    """
    convert me to MediaWiki markup

    Returns:
        string: the MediWiki Markup for this table
    """
    markup = (
        self.noneReplace(self.start)
        + self.noneReplace(self.header)
        + self.noneReplace(self.content)
        + self.noneReplace(self.end)
    )
    return markup

pagehistory

PageHistory

Represents the history of a page

Source code in wikibot3rd/pagehistory.py
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
class PageHistory:
    """
    Represents the history of a page
    """

    def __init__(self, pageTitle: str, wikiId: str, debug: bool = False):
        """
        Constructor

        Args:
            pageTitle(str): name of the page
            wikiId(str): id of the wiki the page is located
            debug(bool): If True show debug messages
        """
        self.debug = debug
        self.pageTitle = pageTitle
        self.wikiClient = WikiClient.ofWikiId(wikiId, debug=self.debug)
        self.page = self.wikiClient.getPage(pageTitle)
        self.revisions = self._getRevisions()

    def _getRevisions(self) -> List[PageRevision]:
        """
        Get the revisions of the page as PageRevision object

        Returns:
            List of PageRevisions of the page
        """
        revisions = []
        for revisionRecord in self.page.revisions(
            prop="ids|timestamp|user|userid|comment|size"
        ):
            revision = PageRevision()
            revision.fromDict(revisionRecord)
            revisions.append(revision)
        return revisions

    def exists(self) -> bool:
        """
        Checks if the page exists
        Assumption: If the page exists than ot exists at least one revision entry

        Returns:
            True if the page exists otherwise False
        """
        return len(self.revisions) > 0

    def getFirstUser(
        self, reverse: bool = False, limitedUserGroup: List[str] = None
    ) -> Union[str, None]:
        """
        Returns the first user in the revisions

        Args:
            reverse(bool): If False start the search at the oldest entry. Otherwise, search from the newest to the oldest revision
            limitedUserGroup(list): limit the search to the given list. If None all users will be considered.

        Returns:
            str username that matches the search criterion
        """
        revisions = self.revisions
        revisions.sort(key=lambda r: int(getattr(r, "revid", 0)))
        if reverse:
            revisions = reversed(revisions)
        for revision in revisions:
            user = getattr(revision, "user", None)
            if user is None:
                continue
            if limitedUserGroup is None:
                return user
            elif user in limitedUserGroup:
                return user
        return None

__init__(pageTitle, wikiId, debug=False)

Constructor

Parameters:

Name Type Description Default
pageTitle(str)

name of the page

required
wikiId(str)

id of the wiki the page is located

required
debug(bool)

If True show debug messages

required
Source code in wikibot3rd/pagehistory.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def __init__(self, pageTitle: str, wikiId: str, debug: bool = False):
    """
    Constructor

    Args:
        pageTitle(str): name of the page
        wikiId(str): id of the wiki the page is located
        debug(bool): If True show debug messages
    """
    self.debug = debug
    self.pageTitle = pageTitle
    self.wikiClient = WikiClient.ofWikiId(wikiId, debug=self.debug)
    self.page = self.wikiClient.getPage(pageTitle)
    self.revisions = self._getRevisions()

exists()

Checks if the page exists Assumption: If the page exists than ot exists at least one revision entry

Returns:

Type Description
bool

True if the page exists otherwise False

Source code in wikibot3rd/pagehistory.py
78
79
80
81
82
83
84
85
86
def exists(self) -> bool:
    """
    Checks if the page exists
    Assumption: If the page exists than ot exists at least one revision entry

    Returns:
        True if the page exists otherwise False
    """
    return len(self.revisions) > 0

getFirstUser(reverse=False, limitedUserGroup=None)

Returns the first user in the revisions

Parameters:

Name Type Description Default
reverse(bool)

If False start the search at the oldest entry. Otherwise, search from the newest to the oldest revision

required
limitedUserGroup(list)

limit the search to the given list. If None all users will be considered.

required

Returns:

Type Description
Union[str, None]

str username that matches the search criterion

Source code in wikibot3rd/pagehistory.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def getFirstUser(
    self, reverse: bool = False, limitedUserGroup: List[str] = None
) -> Union[str, None]:
    """
    Returns the first user in the revisions

    Args:
        reverse(bool): If False start the search at the oldest entry. Otherwise, search from the newest to the oldest revision
        limitedUserGroup(list): limit the search to the given list. If None all users will be considered.

    Returns:
        str username that matches the search criterion
    """
    revisions = self.revisions
    revisions.sort(key=lambda r: int(getattr(r, "revid", 0)))
    if reverse:
        revisions = reversed(revisions)
    for revision in revisions:
        user = getattr(revision, "user", None)
        if user is None:
            continue
        if limitedUserGroup is None:
            return user
        elif user in limitedUserGroup:
            return user
    return None

PageRevision

Bases: JSONAble

Represents the metadata of a mediawiki page revision

Source code in wikibot3rd/pagehistory.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class PageRevision(JSONAble):
    """
    Represents the metadata of a mediawiki page revision
    """

    @classmethod
    def getSamples(cls):
        samples = [
            {
                "revid": 7056,
                "parentid": 0,
                "user": "127.0.0.1",
                "anon": "",
                "userid": 0,
                "timestamp": "2008-10-14T21:23:09Z",
                "size": 6905,
                "comment": "Event created",
            },
            {
                "revid": 8195,
                "parentid": 8194,
                "user": "Wf",
                "timestamp": "2021-11-11T12:50:31Z",
                "size": 910,
                "comment": "",
            },
        ]
        return samples

    def __repr__(self):
        props = ", ".join([f"{k}={str(v)}" for k, v in self.__dict__.items()])
        return f"{self.__class__.__name__}({props})"

selector

Created on 2020-12-20

Selector

selector class

Source code in wikibot3rd/selector.py
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
class Selector:
    """
    selector class
    """

    def __init__(self, items):
        """
        Constructor
        """
        import tkinter as tk

        self.items = items
        self.items = list(map(str, self.items))
        self.var = dict()
        self.count = 1
        self.checkvar = tk.IntVar(value=1)
        self.label_test = tk.StringVar()
        self.label_test.set("Select None")
        self.quitProgram = False

    def createWindow(self, root, action, title, description):
        """
         create the Window for the selection list
         Args:
            root(tk.App Object): tk.App opened object
            action(str): Type of Action
            title(str): Title of Window
            description(str): Description of Task to do
        Returns:
            None
        """
        import tkinter as tk

        # Title
        rowCounter = 0
        root.title(title)  # set window title to given title string
        root.resizable(1, 1)  # set so the window can be resized by user

        # finding optimal window size
        items_by_len = sorted(self.items, key=len)
        longest_string = items_by_len[-1]

        # Setting Message to description string
        desc = tk.Message(root, text=description)
        desc.bind("<Configure>", lambda e: desc.configure(width=e.width - 10))
        desc.pack(side="top", fill=tk.X)

        # Frame creation for Listbox
        frameList = tk.Frame(root)

        items = tk.StringVar(root)
        items.set(self.items)
        listbox = tk.Listbox(
            frameList,
            listvariable=items,
            selectmode="multiple",
            width=len(longest_string) + 5,
        )
        listbox.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
        listbox.select_set(0, tk.END)

        # Frame creation for select all/none checkbox
        frameSelect = tk.Frame(root)
        frameSelect.pack(side=tk.TOP, fill=tk.BOTH)
        frameList.pack(fill=tk.BOTH, expand=True)

        # label and checkbutton creation
        label = tk.Label(frameSelect, textvariable=self.label_test)
        check = tk.Checkbutton(
            frameSelect,
            text="",
            variable=self.checkvar,
            command=lambda: self.select_all(listbox),
        )
        check.pack(side=tk.LEFT, anchor="sw")
        label.pack(side=tk.LEFT, anchor="sw")

        # Scrollbar binding and creation
        scrollbar = tk.Scrollbar(frameList)
        scrollbar.pack(side=tk.RIGHT, fill=tk.BOTH)
        listbox.config(yscrollcommand=scrollbar.set)
        scrollbar.config(command=listbox.yview)
        listbox.bind("<<ListboxSelect>>", self.updateCheck)

        # Frame for Action and Quit Buttons
        frameControl = tk.Frame(root)
        frameControl.pack(side=tk.BOTTOM, fill=tk.X)

        # Action and Quit button creation
        actionBtn = tk.Button(
            frameControl,
            text=action,
            bg="green",
            fg="white",
            command=lambda: self.updatePages(root, listbox),
        )
        actionBtn.pack(side=tk.LEFT, anchor="sw")
        quitBtn = tk.Button(
            frameControl,
            text="Quit",
            bg="red",
            fg="white",
            command=lambda: self.quitSelector(root),
        )
        quitBtn.pack(side=tk.RIGHT, anchor="se")

        # To destroy window and exit program if cross button is pressed
        root.protocol("WM_DELETE_WINDOW", lambda: self.quitSelector(root))

    def updateCheck(self, event):
        """
        Helper function to change state of select all checkbox
        Returns:
            event(Tk event Object): All registered events on Tkinter
        """
        if len(event.widget.curselection()) < len(
            self.items
        ):  # check if any item is deselected
            self.label_test.set("Select All")
            self.checkvar.set(0)
        elif len(event.widget.curselection()) == len(self.items):
            self.label_test.set("Select None")
            self.checkvar.set(1)

    def select_all(self, listbox):
        """
        Button helper function to select all list items
        Returns:
            listbox(Tk Listbox Object): listbox to update to all items
        """
        import tkinter as tk

        if self.checkvar.get():
            listbox.select_set(0, tk.END)
            self.label_test.set("Select None")
        else:
            listbox.select_clear(0, tk.END)
            self.label_test.set("Select All")

    def updatePages(self, root, listbox):
        """
        Update function to remove unselected items from list
        Args:
            root(tk.App Object): tk.App opened object
            listbox(TK listbox Object): listbox to update
        Returns:
            None
        """
        self.items = [
            listbox.get(idx) for idx in listbox.curselection()
        ]  # Remove unselected items from list
        root.destroy()

    def quitSelector(self, root):
        """
        Quit the python program when Quit Button is pressed.
        Args:
            root(tk.App Object): tk.App opened object
        Returns:
            None
        """
        root.destroy()
        self.items = "Q"

    def getUpdatedPages(self):
        """
        Getter function for the class variable items
        Returns:
            items(list): List of pages selected in GUI by user
        """
        return self.items

    @staticmethod
    def select(selectionList, action="Select", title="Selection", description=""):
        """
        Creates a GUI in which the user can select a subset of the provided selectionList.
        The user can quit the selection resulting in the return of an empty list.
        :param selectionList:
        :param action: name of the action the selection performs. Default is select
        :param title: title of the created window
        :param description: Instructions for the user (consequence of the selection)
        :return: user selected subset of the given selectionList
        """
        import tkinter as tk

        root = tk.Tk()
        GUI = Selector(selectionList)  # Tkinter Object creation
        GUI.createWindow(
            root, action, title, description
        )  # create Window with given parameters
        root.mainloop()  # Run GUI loop
        selectionList = GUI.getUpdatedPages()  # Get selected items
        return selectionList

__init__(items)

Constructor

Source code in wikibot3rd/selector.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def __init__(self, items):
    """
    Constructor
    """
    import tkinter as tk

    self.items = items
    self.items = list(map(str, self.items))
    self.var = dict()
    self.count = 1
    self.checkvar = tk.IntVar(value=1)
    self.label_test = tk.StringVar()
    self.label_test.set("Select None")
    self.quitProgram = False

createWindow(root, action, title, description)

create the Window for the selection list Args: root(tk.App Object): tk.App opened object action(str): Type of Action title(str): Title of Window description(str): Description of Task to do Returns: None

Source code in wikibot3rd/selector.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def createWindow(self, root, action, title, description):
    """
     create the Window for the selection list
     Args:
        root(tk.App Object): tk.App opened object
        action(str): Type of Action
        title(str): Title of Window
        description(str): Description of Task to do
    Returns:
        None
    """
    import tkinter as tk

    # Title
    rowCounter = 0
    root.title(title)  # set window title to given title string
    root.resizable(1, 1)  # set so the window can be resized by user

    # finding optimal window size
    items_by_len = sorted(self.items, key=len)
    longest_string = items_by_len[-1]

    # Setting Message to description string
    desc = tk.Message(root, text=description)
    desc.bind("<Configure>", lambda e: desc.configure(width=e.width - 10))
    desc.pack(side="top", fill=tk.X)

    # Frame creation for Listbox
    frameList = tk.Frame(root)

    items = tk.StringVar(root)
    items.set(self.items)
    listbox = tk.Listbox(
        frameList,
        listvariable=items,
        selectmode="multiple",
        width=len(longest_string) + 5,
    )
    listbox.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
    listbox.select_set(0, tk.END)

    # Frame creation for select all/none checkbox
    frameSelect = tk.Frame(root)
    frameSelect.pack(side=tk.TOP, fill=tk.BOTH)
    frameList.pack(fill=tk.BOTH, expand=True)

    # label and checkbutton creation
    label = tk.Label(frameSelect, textvariable=self.label_test)
    check = tk.Checkbutton(
        frameSelect,
        text="",
        variable=self.checkvar,
        command=lambda: self.select_all(listbox),
    )
    check.pack(side=tk.LEFT, anchor="sw")
    label.pack(side=tk.LEFT, anchor="sw")

    # Scrollbar binding and creation
    scrollbar = tk.Scrollbar(frameList)
    scrollbar.pack(side=tk.RIGHT, fill=tk.BOTH)
    listbox.config(yscrollcommand=scrollbar.set)
    scrollbar.config(command=listbox.yview)
    listbox.bind("<<ListboxSelect>>", self.updateCheck)

    # Frame for Action and Quit Buttons
    frameControl = tk.Frame(root)
    frameControl.pack(side=tk.BOTTOM, fill=tk.X)

    # Action and Quit button creation
    actionBtn = tk.Button(
        frameControl,
        text=action,
        bg="green",
        fg="white",
        command=lambda: self.updatePages(root, listbox),
    )
    actionBtn.pack(side=tk.LEFT, anchor="sw")
    quitBtn = tk.Button(
        frameControl,
        text="Quit",
        bg="red",
        fg="white",
        command=lambda: self.quitSelector(root),
    )
    quitBtn.pack(side=tk.RIGHT, anchor="se")

    # To destroy window and exit program if cross button is pressed
    root.protocol("WM_DELETE_WINDOW", lambda: self.quitSelector(root))

getUpdatedPages()

Getter function for the class variable items Returns: items(list): List of pages selected in GUI by user

Source code in wikibot3rd/selector.py
171
172
173
174
175
176
177
def getUpdatedPages(self):
    """
    Getter function for the class variable items
    Returns:
        items(list): List of pages selected in GUI by user
    """
    return self.items

quitSelector(root)

Quit the python program when Quit Button is pressed. Args: root(tk.App Object): tk.App opened object Returns: None

Source code in wikibot3rd/selector.py
160
161
162
163
164
165
166
167
168
169
def quitSelector(self, root):
    """
    Quit the python program when Quit Button is pressed.
    Args:
        root(tk.App Object): tk.App opened object
    Returns:
        None
    """
    root.destroy()
    self.items = "Q"

select(selectionList, action='Select', title='Selection', description='') staticmethod

Creates a GUI in which the user can select a subset of the provided selectionList. The user can quit the selection resulting in the return of an empty list. :param selectionList: :param action: name of the action the selection performs. Default is select :param title: title of the created window :param description: Instructions for the user (consequence of the selection) :return: user selected subset of the given selectionList

Source code in wikibot3rd/selector.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
@staticmethod
def select(selectionList, action="Select", title="Selection", description=""):
    """
    Creates a GUI in which the user can select a subset of the provided selectionList.
    The user can quit the selection resulting in the return of an empty list.
    :param selectionList:
    :param action: name of the action the selection performs. Default is select
    :param title: title of the created window
    :param description: Instructions for the user (consequence of the selection)
    :return: user selected subset of the given selectionList
    """
    import tkinter as tk

    root = tk.Tk()
    GUI = Selector(selectionList)  # Tkinter Object creation
    GUI.createWindow(
        root, action, title, description
    )  # create Window with given parameters
    root.mainloop()  # Run GUI loop
    selectionList = GUI.getUpdatedPages()  # Get selected items
    return selectionList

select_all(listbox)

Button helper function to select all list items Returns: listbox(Tk Listbox Object): listbox to update to all items

Source code in wikibot3rd/selector.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def select_all(self, listbox):
    """
    Button helper function to select all list items
    Returns:
        listbox(Tk Listbox Object): listbox to update to all items
    """
    import tkinter as tk

    if self.checkvar.get():
        listbox.select_set(0, tk.END)
        self.label_test.set("Select None")
    else:
        listbox.select_clear(0, tk.END)
        self.label_test.set("Select All")

updateCheck(event)

Helper function to change state of select all checkbox Returns: event(Tk event Object): All registered events on Tkinter

Source code in wikibot3rd/selector.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def updateCheck(self, event):
    """
    Helper function to change state of select all checkbox
    Returns:
        event(Tk event Object): All registered events on Tkinter
    """
    if len(event.widget.curselection()) < len(
        self.items
    ):  # check if any item is deselected
        self.label_test.set("Select All")
        self.checkvar.set(0)
    elif len(event.widget.curselection()) == len(self.items):
        self.label_test.set("Select None")
        self.checkvar.set(1)

updatePages(root, listbox)

Update function to remove unselected items from list Args: root(tk.App Object): tk.App opened object listbox(TK listbox Object): listbox to update Returns: None

Source code in wikibot3rd/selector.py
146
147
148
149
150
151
152
153
154
155
156
157
158
def updatePages(self, root, listbox):
    """
    Update function to remove unselected items from list
    Args:
        root(tk.App Object): tk.App opened object
        listbox(TK listbox Object): listbox to update
    Returns:
        None
    """
    self.items = [
        listbox.get(idx) for idx in listbox.curselection()
    ]  # Remove unselected items from list
    root.destroy()

smw

Created on 2020-05-29

@author: wf

PrintRequest

Bases: object

Source code in wikibot3rd/smw.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
class PrintRequest(object):
    debug = False
    """
    construct the given print request
    see https://www.semantic-mediawiki.org/wiki/Serialization_(JSON)
    :ivar smw: SMW context for this printrequest
    :ivar label: the label of the printrequest
    :ivar key:
    :ivar redi:
    :ivar typeid:
    :ivar mode:
    :ivar format:
    """

    def __init__(self, smw, record):
        """
        construct me from the given record
        Args:
            smw(SMW): the SemanticMediaWiki context of this PrintRequest
            record(dict): the dict derived from the printrequest json serialization
        """
        self.smw = smw
        self.debug = PrintRequest.debug
        if self.debug:
            print(record)
        self.label = record["label"]
        self.key = record["key"]
        self.redi = record["redi"]
        self.typeid = record["typeid"]
        self.mode = int(record["mode"])
        if "format" in record:
            self.format = record["format"]
        else:
            self.format = None

    def deserializeSingle(self, value):
        """deserialize a single value
        Args:
            value(object): the value to be deserialized according to the typeid

        Returns:
            the deserialized value
        """
        # FIXME complete list of types according to
        # https://www.semantic-mediawiki.org/wiki/Help:API:ask
        # Page https://www.semantic-mediawiki.org/wiki/Help:API:ask/Page
        if self.typeid == "_wpg":
            value = value["fulltext"]
            if value:
                value = unquote(value)
            pass
        # Text https://www.semantic-mediawiki.org/wiki/Help:API:ask/Text
        elif self.typeid == "_txt":
            pass
        elif self.typeid == "_qty":
            pass
        elif self.typeid == "_num":
            value = int(value)
        elif self.typeid == "_dat":
            if "timestamp" in value:
                ts = int(value["timestamp"])
                try:
                    # timezone aware
                    value = datetime.fromtimestamp(ts, tz=timezone.utc)
                    # naive - for compatibility
                    value = value.replace(tzinfo=None)
                    #  print (date.strftime('%Y-%m-%d %H:%M:%S'))
                except ValueError as ve:
                    if self.debug:
                        print("Warning timestamp %d is invalid: %s" % (ts, str(ve)))
                    pass
            else:
                # ignore faulty values
                if self.debug:
                    print("Warning: timestamp missing for value")
                pass
        elif self.typeid == "_eid":
            pass
        else:
            pass
        return value

    def deserialize(self, result):
        """deserialize the given result record
        Args:
            result(dict): a single result record dict from the deserialiation of the ask query
        Returns:
            object: a single deserialized value according to my typeid
        """
        po = result["printouts"]
        if self.label in po:
            value = po[self.label]
        else:
            value = result
        if isinstance(value, list):
            valueList = []
            for valueItem in value:
                valueList.append(self.deserializeSingle(valueItem))
            # handle lists
            # empty lists => None
            if len(valueList) == 0:
                value = None
            # lists with one value -> return the item (this unfortunately removes the list property of the value)
            elif len(valueList) == 1:
                value = valueList[0]
            # only if there is a "real" list return it
            else:
                value = valueList
        else:
            value = self.deserializeSingle(value)

        if PrintRequest.debug:
            print("%s(%s)='%s'" % (self.label, self.typeid, value))
        return value

    def __repr__(self):
        text = (
            "PrintRequest(label='%s' key='%s' redi='%s' typeid='%s' mode=%d format='%s')"
            % (self.label, self.key, self.redi, self.typeid, self.mode, self.format)
        )
        return text

debug = PrintRequest.debug class-attribute instance-attribute

construct the given print request see https://www.semantic-mediawiki.org/wiki/Serialization_(JSON) :ivar smw: SMW context for this printrequest :ivar label: the label of the printrequest :ivar key: :ivar redi: :ivar typeid: :ivar mode: :ivar format:

__init__(smw, record)

construct me from the given record Args: smw(SMW): the SemanticMediaWiki context of this PrintRequest record(dict): the dict derived from the printrequest json serialization

Source code in wikibot3rd/smw.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def __init__(self, smw, record):
    """
    construct me from the given record
    Args:
        smw(SMW): the SemanticMediaWiki context of this PrintRequest
        record(dict): the dict derived from the printrequest json serialization
    """
    self.smw = smw
    self.debug = PrintRequest.debug
    if self.debug:
        print(record)
    self.label = record["label"]
    self.key = record["key"]
    self.redi = record["redi"]
    self.typeid = record["typeid"]
    self.mode = int(record["mode"])
    if "format" in record:
        self.format = record["format"]
    else:
        self.format = None

deserialize(result)

deserialize the given result record Args: result(dict): a single result record dict from the deserialiation of the ask query Returns: object: a single deserialized value according to my typeid

Source code in wikibot3rd/smw.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def deserialize(self, result):
    """deserialize the given result record
    Args:
        result(dict): a single result record dict from the deserialiation of the ask query
    Returns:
        object: a single deserialized value according to my typeid
    """
    po = result["printouts"]
    if self.label in po:
        value = po[self.label]
    else:
        value = result
    if isinstance(value, list):
        valueList = []
        for valueItem in value:
            valueList.append(self.deserializeSingle(valueItem))
        # handle lists
        # empty lists => None
        if len(valueList) == 0:
            value = None
        # lists with one value -> return the item (this unfortunately removes the list property of the value)
        elif len(valueList) == 1:
            value = valueList[0]
        # only if there is a "real" list return it
        else:
            value = valueList
    else:
        value = self.deserializeSingle(value)

    if PrintRequest.debug:
        print("%s(%s)='%s'" % (self.label, self.typeid, value))
    return value

deserializeSingle(value)

deserialize a single value Args: value(object): the value to be deserialized according to the typeid

Returns:

Type Description

the deserialized value

Source code in wikibot3rd/smw.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def deserializeSingle(self, value):
    """deserialize a single value
    Args:
        value(object): the value to be deserialized according to the typeid

    Returns:
        the deserialized value
    """
    # FIXME complete list of types according to
    # https://www.semantic-mediawiki.org/wiki/Help:API:ask
    # Page https://www.semantic-mediawiki.org/wiki/Help:API:ask/Page
    if self.typeid == "_wpg":
        value = value["fulltext"]
        if value:
            value = unquote(value)
        pass
    # Text https://www.semantic-mediawiki.org/wiki/Help:API:ask/Text
    elif self.typeid == "_txt":
        pass
    elif self.typeid == "_qty":
        pass
    elif self.typeid == "_num":
        value = int(value)
    elif self.typeid == "_dat":
        if "timestamp" in value:
            ts = int(value["timestamp"])
            try:
                # timezone aware
                value = datetime.fromtimestamp(ts, tz=timezone.utc)
                # naive - for compatibility
                value = value.replace(tzinfo=None)
                #  print (date.strftime('%Y-%m-%d %H:%M:%S'))
            except ValueError as ve:
                if self.debug:
                    print("Warning timestamp %d is invalid: %s" % (ts, str(ve)))
                pass
        else:
            # ignore faulty values
            if self.debug:
                print("Warning: timestamp missing for value")
            pass
    elif self.typeid == "_eid":
        pass
    else:
        pass
    return value

QueryResultSizeExceedException

Bases: BaseException

Raised if the results of a query can not completely be queried due to SMW result limits.

Source code in wikibot3rd/smw.py
674
675
676
677
678
679
680
681
682
683
684
685
686
687
class QueryResultSizeExceedException(BaseException):
    """Raised if the results of a query can not completely be queried due to SMW result limits."""

    def __init__(
        self,
        result=[],
        message="Query can not completely be queried due to SMW result limits.",
    ):
        super().__init__(message)
        self.result = result

    def getResults(self):
        """Returns the queried results before the exception was raised"""
        return self.result

getResults()

Returns the queried results before the exception was raised

Source code in wikibot3rd/smw.py
685
686
687
def getResults(self):
    """Returns the queried results before the exception was raised"""
    return self.result

SMW

Bases: object

Semantic MediaWiki Access e.g. for ask API see * https://www.semantic-mediawiki.org/wiki/Help:API * https://www.semantic-mediawiki.org/wiki/Serialization_(JSON) * https://www.semantic-mediawiki.org/wiki/Help:API:askargs

adapted from Java SimpleGraph Module https://github.com/BITPlan/com.bitplan.simplegraph/blob/master/simplegraph-smw/src/main/java/com/bitplan/simplegraph/smw/SmwSystem.java :ivar site: the pywikibot site to use for requests :ivar prefix: the path prefix for this site e.g. /wiki/

Source code in wikibot3rd/smw.py
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
class SMW(object):
    """
    Semantic MediaWiki Access e.g. for ask API
    see
    * https://www.semantic-mediawiki.org/wiki/Help:API
    * https://www.semantic-mediawiki.org/wiki/Serialization_(JSON)
    * https://www.semantic-mediawiki.org/wiki/Help:API:askargs

    adapted from Java SimpleGraph Module
    https://github.com/BITPlan/com.bitplan.simplegraph/blob/master/simplegraph-smw/src/main/java/com/bitplan/simplegraph/smw/SmwSystem.java
    :ivar site: the pywikibot site to use for requests
    :ivar prefix: the path prefix for this site e.g. /wiki/
    """

    def __init__(
        self, site=None, prefix="/", showProgress=False, queryDivision=1, debug=False
    ):
        """
        Constructor
        Args:
            site: the site to use (optional)
            showProgess(bool): if progress should be shown
            queryDivision(int): Defines the number of subintervals the query is divided into (must be greater equal 1)
            debug(bool): if debugging should be activated - default: False
        """
        self.site = site
        self.prefix = prefix
        self.showProgress = showProgress
        self.queryDivision = queryDivision
        self.splitClause = SplitClause()
        self.debug = debug

    def deserialize(self, rawresult) -> dict:
        """deserialize the given rawresult according to
        https://www.semantic-mediawiki.org/wiki/Serialization_(JSON)

        Args:
            rawresult(dict): contains printrequests and results which need to be merged

        Returns:
            dict: query mainlabel (usually pageTitle) mapped to the corresponding dict of printrequests with label
        """
        resultDict = {}
        if rawresult is None:
            return resultDict
        if not "query" in rawresult:
            raise Exception("invalid query result - 'query' missing")
        query = rawresult["query"]
        if not "printrequests" in query:
            raise Exception("invalid query result - 'printrequests' missing")
        printrequests = query["printrequests"]
        if not "results" in query:
            raise Exception("invalid query result - 'results' missing")
        results = query["results"]
        prdict = {}
        for record in printrequests:
            pr = PrintRequest(self, record)
            prdict[pr.label] = pr

        if results:
            for key in results.keys():
                record = results[key]
                recordDict = {}
                for label in prdict.keys():
                    pr = prdict[label]
                    recordDict[label] = pr.deserialize(record)
                resultDict[key] = recordDict
        return resultDict

    def fixAsk(self, ask: str):
        """
        fix an ask String to be usable for the API
        Args:
            ask: - a "normal" ask query

        Returns:
             the fixed asked query
        """
        # ^\\s*\\{\\{
        # remove {{ with surrounding white space at beginning
        fixedAsk = re.sub(r"^\s*\{\{", "", ask)
        # remove #ask:
        fixedAsk = re.sub(r"#ask:", "", fixedAsk)
        # remove }} with surrounding white space at end
        fixedAsk = re.sub(r"\}\}\s*$", "", fixedAsk)
        # split by lines (with side effect to remove newlines)
        parts = fixedAsk.split(r"\n")
        fixedAsk = ""
        for part in parts:
            #  remove whitespace around part
            part = part.strip()
            # remove whitespace around pipe sign
            part = re.sub(r"\s*\|\s*", "|", part)
            # remove whitespace around assignment =
            part = re.sub(r"\s*=\s*", "=", part)
            # remove whitespace in query parts
            part = re.sub(r"\]\s*\[", "][", part)
            fixedAsk = fixedAsk + part
        return fixedAsk

    def getConcept(self, ask):
        """get the concept from the given ask query"""
        m = re.search(r"\[\[Concept:(.+?)\]\]", ask)
        if m:
            return m.groups()[0]
        else:
            return None

    argumentRegex = staticmethod(lambda arg: r"\| *" + arg + r" *= *\d+(?=\s|\||$)")

    @staticmethod
    def getOuterMostArgumentValueOfQuery(argument, query):
        """
        Extracts the integer value of the given argument from the given query
        Args:
            argument(string): Argument that should be extracted
            query(string): smw query where the given argument is assumed

        Returns:
            Returns integer value of the given argument in the given query.
            If the argument occurs multiple times the last one is returned.
            If it does not occur return None.
        """
        if not argument or not query:
            return None
        args = re.compile(SMW.argumentRegex(argument), re.IGNORECASE).findall(query)
        if not args:
            return None
        return int(re.compile("[0-9]+").search(args[-1]).group())

__init__(site=None, prefix='/', showProgress=False, queryDivision=1, debug=False)

Constructor Args: site: the site to use (optional) showProgess(bool): if progress should be shown queryDivision(int): Defines the number of subintervals the query is divided into (must be greater equal 1) debug(bool): if debugging should be activated - default: False

Source code in wikibot3rd/smw.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
def __init__(
    self, site=None, prefix="/", showProgress=False, queryDivision=1, debug=False
):
    """
    Constructor
    Args:
        site: the site to use (optional)
        showProgess(bool): if progress should be shown
        queryDivision(int): Defines the number of subintervals the query is divided into (must be greater equal 1)
        debug(bool): if debugging should be activated - default: False
    """
    self.site = site
    self.prefix = prefix
    self.showProgress = showProgress
    self.queryDivision = queryDivision
    self.splitClause = SplitClause()
    self.debug = debug

deserialize(rawresult)

deserialize the given rawresult according to https://www.semantic-mediawiki.org/wiki/Serialization_(JSON)

Parameters:

Name Type Description Default
rawresult(dict)

contains printrequests and results which need to be merged

required

Returns:

Name Type Description
dict dict

query mainlabel (usually pageTitle) mapped to the corresponding dict of printrequests with label

Source code in wikibot3rd/smw.py
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def deserialize(self, rawresult) -> dict:
    """deserialize the given rawresult according to
    https://www.semantic-mediawiki.org/wiki/Serialization_(JSON)

    Args:
        rawresult(dict): contains printrequests and results which need to be merged

    Returns:
        dict: query mainlabel (usually pageTitle) mapped to the corresponding dict of printrequests with label
    """
    resultDict = {}
    if rawresult is None:
        return resultDict
    if not "query" in rawresult:
        raise Exception("invalid query result - 'query' missing")
    query = rawresult["query"]
    if not "printrequests" in query:
        raise Exception("invalid query result - 'printrequests' missing")
    printrequests = query["printrequests"]
    if not "results" in query:
        raise Exception("invalid query result - 'results' missing")
    results = query["results"]
    prdict = {}
    for record in printrequests:
        pr = PrintRequest(self, record)
        prdict[pr.label] = pr

    if results:
        for key in results.keys():
            record = results[key]
            recordDict = {}
            for label in prdict.keys():
                pr = prdict[label]
                recordDict[label] = pr.deserialize(record)
            resultDict[key] = recordDict
    return resultDict

fixAsk(ask)

fix an ask String to be usable for the API Args: ask: - a "normal" ask query

Returns:

Type Description

the fixed asked query

Source code in wikibot3rd/smw.py
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
def fixAsk(self, ask: str):
    """
    fix an ask String to be usable for the API
    Args:
        ask: - a "normal" ask query

    Returns:
         the fixed asked query
    """
    # ^\\s*\\{\\{
    # remove {{ with surrounding white space at beginning
    fixedAsk = re.sub(r"^\s*\{\{", "", ask)
    # remove #ask:
    fixedAsk = re.sub(r"#ask:", "", fixedAsk)
    # remove }} with surrounding white space at end
    fixedAsk = re.sub(r"\}\}\s*$", "", fixedAsk)
    # split by lines (with side effect to remove newlines)
    parts = fixedAsk.split(r"\n")
    fixedAsk = ""
    for part in parts:
        #  remove whitespace around part
        part = part.strip()
        # remove whitespace around pipe sign
        part = re.sub(r"\s*\|\s*", "|", part)
        # remove whitespace around assignment =
        part = re.sub(r"\s*=\s*", "=", part)
        # remove whitespace in query parts
        part = re.sub(r"\]\s*\[", "][", part)
        fixedAsk = fixedAsk + part
    return fixedAsk

getConcept(ask)

get the concept from the given ask query

Source code in wikibot3rd/smw.py
293
294
295
296
297
298
299
def getConcept(self, ask):
    """get the concept from the given ask query"""
    m = re.search(r"\[\[Concept:(.+?)\]\]", ask)
    if m:
        return m.groups()[0]
    else:
        return None

getOuterMostArgumentValueOfQuery(argument, query) staticmethod

Extracts the integer value of the given argument from the given query Args: argument(string): Argument that should be extracted query(string): smw query where the given argument is assumed

Returns:

Type Description

Returns integer value of the given argument in the given query.

If the argument occurs multiple times the last one is returned.

If it does not occur return None.

Source code in wikibot3rd/smw.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
@staticmethod
def getOuterMostArgumentValueOfQuery(argument, query):
    """
    Extracts the integer value of the given argument from the given query
    Args:
        argument(string): Argument that should be extracted
        query(string): smw query where the given argument is assumed

    Returns:
        Returns integer value of the given argument in the given query.
        If the argument occurs multiple times the last one is returned.
        If it does not occur return None.
    """
    if not argument or not query:
        return None
    args = re.compile(SMW.argumentRegex(argument), re.IGNORECASE).findall(query)
    if not args:
        return None
    return int(re.compile("[0-9]+").search(args[-1]).group())

SMWBot

Bases: SMW

Semantic MediaWiki access using pywikibot library

Source code in wikibot3rd/smw.py
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
class SMWBot(SMW):
    """
    Semantic MediaWiki access using pywikibot library
    """

    def __init__(self, site=None, prefix="/", showProgress=False, debug=False):
        """
        constructor

        Args:
            site:
            prefix(str): the prefix to use
            showProgress(bool): show progress if true
            debug(bool): set debugging mode if true
        """
        super(SMWBot, self).__init__(
            site, prefix, showProgress=showProgress, debug=debug
        )
        pass

    def submit(self, parameters):
        """submit the request with the given parameters
        Args:
            parameters(list): the parameters to use for the SMW API request
        Returns:
            dict: the submit result
        """
        if not "Request" in sys.modules:
            from pywikibot.data.api import Request

        request = Request(site=self.site, parameters=parameters)
        result = None
        try:
            result = request.submit()
        except pywikibot.exceptions.TimeoutError as _toe:
            msg = f"submit for {self.site} failed due to pywikibot TimeoutError"
            raise Exception(msg)
            pass
        return result

    def info(self):
        """see https://www.semantic-mediawiki.org/wiki/Help:API:smwinfo"""
        parameters = {"action": "smwinfo"}
        return self.submit(parameters)

    def rawquery(self, ask: str, limit=None):
        """
         send a query see https://www.semantic-mediawiki.org/wiki/Help:Inline_queries#Parser_function_.23ask

        Args:
            ask(str): the SMW ASK query as it would be used in MediaWiki markup
        """
        # allow usage of original Wiki ask content - strip all non needed parts
        fixedAsk = self.fixAsk(ask)
        # set parameters for request
        parameters = {"action": "ask", "query": fixedAsk}
        result = self.submit(parameters)
        return result

    def query(self, ask, limit=None):
        """
        send a query and deserialize it
        """
        rawresult = self.rawquery(ask, limit=limit)
        result = self.deserialize(rawresult)
        return result

__init__(site=None, prefix='/', showProgress=False, debug=False)

constructor

Parameters:

Name Type Description Default
site
None
prefix(str)

the prefix to use

required
showProgress(bool)

show progress if true

required
debug(bool)

set debugging mode if true

required
Source code in wikibot3rd/smw.py
611
612
613
614
615
616
617
618
619
620
621
622
623
624
def __init__(self, site=None, prefix="/", showProgress=False, debug=False):
    """
    constructor

    Args:
        site:
        prefix(str): the prefix to use
        showProgress(bool): show progress if true
        debug(bool): set debugging mode if true
    """
    super(SMWBot, self).__init__(
        site, prefix, showProgress=showProgress, debug=debug
    )
    pass

info()

see https://www.semantic-mediawiki.org/wiki/Help:API:smwinfo

Source code in wikibot3rd/smw.py
646
647
648
649
def info(self):
    """see https://www.semantic-mediawiki.org/wiki/Help:API:smwinfo"""
    parameters = {"action": "smwinfo"}
    return self.submit(parameters)

query(ask, limit=None)

send a query and deserialize it

Source code in wikibot3rd/smw.py
665
666
667
668
669
670
671
def query(self, ask, limit=None):
    """
    send a query and deserialize it
    """
    rawresult = self.rawquery(ask, limit=limit)
    result = self.deserialize(rawresult)
    return result

rawquery(ask, limit=None)

send a query see https://www.semantic-mediawiki.org/wiki/Help:Inline_queries#Parser_function_.23ask

Parameters:

Name Type Description Default
ask(str)

the SMW ASK query as it would be used in MediaWiki markup

required
Source code in wikibot3rd/smw.py
651
652
653
654
655
656
657
658
659
660
661
662
663
def rawquery(self, ask: str, limit=None):
    """
     send a query see https://www.semantic-mediawiki.org/wiki/Help:Inline_queries#Parser_function_.23ask

    Args:
        ask(str): the SMW ASK query as it would be used in MediaWiki markup
    """
    # allow usage of original Wiki ask content - strip all non needed parts
    fixedAsk = self.fixAsk(ask)
    # set parameters for request
    parameters = {"action": "ask", "query": fixedAsk}
    result = self.submit(parameters)
    return result

submit(parameters)

submit the request with the given parameters Args: parameters(list): the parameters to use for the SMW API request Returns: dict: the submit result

Source code in wikibot3rd/smw.py
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
def submit(self, parameters):
    """submit the request with the given parameters
    Args:
        parameters(list): the parameters to use for the SMW API request
    Returns:
        dict: the submit result
    """
    if not "Request" in sys.modules:
        from pywikibot.data.api import Request

    request = Request(site=self.site, parameters=parameters)
    result = None
    try:
        result = request.submit()
    except pywikibot.exceptions.TimeoutError as _toe:
        msg = f"submit for {self.site} failed due to pywikibot TimeoutError"
        raise Exception(msg)
        pass
    return result

SMWClient

Bases: SMW

Semantic MediaWiki access using mw client library

Source code in wikibot3rd/smw.py
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
class SMWClient(SMW):
    """
    Semantic MediaWiki access using mw client library
    """

    def __init__(
        self, site=None, prefix="/", showProgress=False, queryDivision=1, debug=False
    ):
        super(SMWClient, self).__init__(
            site,
            prefix,
            showProgress=showProgress,
            queryDivision=queryDivision,
            debug=debug,
        )
        pass

    def info(self):
        """see https://www.semantic-mediawiki.org/wiki/Help:API:smwinfo"""
        results = self.site.raw_api("smwinfo", http_method="GET")
        self.site.handle_api_result(results)  # raises APIError on error

        return results

    def ask(self, query: str, title: str = None, limit: int = None):
        """
        Ask a query against Semantic MediaWiki.

        API doc: https://semantic-mediawiki.org/wiki/Ask_API

        The query is devided into multiple subqueries if the results of the query exeed the defined threshold.
        If this happens the query is executed multiple times to retrieve all results without passing the threshold.


        Args:
            query(str): SMW ask query to be executed
            title(str): title of query (optional)
            limit(int): the maximum number of results to be returned -
                please note that SMW configuration will also limit results on the server side

        Returns:
            Generator for retrieving all search results, with each answer as a dictionary.
            If the query is invalid, an APIError is raised. A valid query with zero
            results will not raise any error.

        Examples:

            >>> query = "[[Category:my cat]]|[[Has name::a name]]|?Has property"
            >>> for answer in site.ask(query):
            >>>     for title, data in answer.items()
            >>>         print(title)
            >>>         print(data)
        """
        # kwargs = {}
        # if title is None:
        #    kwargs['title'] = title

        if limit is None:
            # if limit is not defined (via cmd-line), check if defined in query
            limit = SMW.getOuterMostArgumentValueOfQuery("limit", query)
        results = None
        if self.queryDivision == 1:
            try:
                results = self.askForAllResults(query, limit)
            except QueryResultSizeExceedException as e:
                print(e)
                results = e.getResults()
        else:
            results = self.askPartitionQuery(query, limit)
        for res in results:
            yield res

    def askPartitionQuery(self, query, limit=None):
        """
        Splits the query into multiple subqueries by determining the 'modification date' interval in which all results
        lie. This interval is then divided into subintervals. The number of subintervals is defined by the user via
        commandline. The results of all subqueries are then returned.
        Args:
            query(string): the SMW inline query to be send via api
            limit(string): limit of the query
        Returns:
            All results of the given query.
        """
        (start, end) = self.getBoundariesOfQuery(query)
        results = []
        if start is None or end is None:
            return results
        if self.showProgress:
            print(f"Start: {start}, End: {end}", file=sys.stderr, flush=True)
        numIntervals = self.queryDivision
        calcIntervalBound = lambda start, n: (start + n * lenSubinterval).replace(
            microsecond=0
        )
        calcLimit = lambda limit, numRes: None if limit is None else limit - numResults
        done = False
        numResults = 0
        while not done:
            lenSubinterval = (end - start) / numIntervals
            for n in range(numIntervals):
                if self.showProgress:
                    print(f"Query {n+1}/{numIntervals}:")
                tempLowerBound = calcIntervalBound(start, n)
                tempUpperBound = (
                    calcIntervalBound(start, n + 1) if (n + 1 < numIntervals) else end
                )
                queryBounds = self.splitClause.queryBounds(
                    tempLowerBound, tempUpperBound
                )
                queryParam = f"{query}|{queryBounds}"
                try:
                    tempRes = self.askForAllResults(
                        queryParam, calcLimit(limit, numResults)
                    )
                    if tempRes is not None:
                        for res in tempRes:
                            results.append(res)
                            query_field = res.get("query")
                            if query_field is not None:
                                meta_field = query_field.get("meta")
                                if meta_field is not None:
                                    numResults += int(meta_field.get("count"))
                except QueryResultSizeExceedException as e:
                    # Too many results for current subinterval n -> print error and return the results upto this point
                    print(e)
                    if e.getResults():
                        for res in e.getResults():
                            results.append(res)
                    numResults = 0
                    break
                if limit is not None and limit <= numResults:
                    if self.showProgress:
                        print(f"Defined limit of {limit} reached - ending querying")
                    done = True
                    break
            done = True
        return results

    def getTimeStampBoundary(self, queryparam, order):
        """
        query according to a DATE e.g. MODIFICATION_DATE in the given order

        Args:

        """
        queryparamBoundary = f"{queryparam}|order={order}"
        resultsBoundary = self.site.raw_api(
            "ask", query=queryparamBoundary, http_method="GET"
        )
        self.site.handle_api_result(resultsBoundary)
        deserializedResult = self.deserialize(resultsBoundary)

        deserializedValues = deserializedResult.values()
        date = self.splitClause.deserialize(deserializedValues)
        return date

    def getBoundariesOfQuery(self, query):
        """
        Retrieves the time interval, lower and upper bound, based on the modification date in which the results of the
        given query lie.
        Args:
            query(string): the SMW inline query to be send via api
        Returns:
            (Datetime, Datetime): Returns the time interval (based on modification date) in which all results of the
            query lie potentially the start end end might be None if an error occured or the input is invalid
        """
        first = self.splitClause.getFirst()
        queryparam = f"{query}|{first}"
        start = self.getTimeStampBoundary(queryparam, "asc")
        end = self.getTimeStampBoundary(queryparam, "desc")
        return (start, end)

    def askForAllResults(self, query, limit=None, kwargs={}):
        """
        Executes the query until all results are received of the given limit is reached.
        If the SMW results limit is reached before all results are retrieved the QueryResultSizeExceedException is raised.
        Args:
            query(string): the SMW inline query to be send via api
            limit(int): limit for the query results, None (default) for all results
            kwargs:
        Returns:
            query results
        Raises:
            QueryResultSizeExceedException: Raised if not all results can be retrieved
        """
        endShowProgress = lambda showProgress, c: (
            print("\n" if not c % 80 == 0 else "") if showProgress else None
        )
        offset = 0
        done = False
        count = 0
        res = []
        while not done:
            count += 1
            if self.showProgress:
                sep = "\n" if count % 80 == 0 else ""
                print(".", end=sep, flush=True)

            queryParam = "{query}|offset={offset}".format(query=query, offset=offset)
            if limit is not None:
                queryParam += "|limit={limit}".format(limit=limit)
            # print(f"QueryPram: {queryParam}")   #debug purposes
            results = self.site.raw_api(
                "ask", query=queryParam, http_method="GET", **kwargs
            )
            self.site.handle_api_result(results)  # raises APIError on error
            continueOffset = results.get("query-continue-offset")
            if continueOffset is None:
                done = True
            else:
                if limit is not None and continueOffset >= limit:
                    done = True
                elif (
                    results.get("query") is not None
                    and not results.get("query").get("results")
                ) or continueOffset < offset:
                    # contine-offset is set but result is empty
                    endShowProgress(self.showProgress, count)
                    res.append(results)
                    raise QueryResultSizeExceedException(result=res)
                if continueOffset < offset:
                    done = True
            offset = continueOffset
            res.append(results)
        endShowProgress(self.showProgress, count)
        return res

    def rawquery(self, askQuery, title=None, limit=None):
        """
        run the given askQuery and return the raw result

        Args:
            askQuery(string): the SMW inline query to be send via api
            title(string): the title (if any)
            limit(int): the maximum number of records to be retrieved (if any)

        Returns:
            dict: the raw query result as returned by the ask API
        """
        fixedAsk = self.fixAsk(askQuery)
        result = None
        for singleResult in self.ask(fixedAsk, title, limit):
            if result is None:
                result = singleResult
            else:
                singleResults = None
                if "query" in singleResult:
                    if "results" in singleResult["query"]:
                        singleResults = singleResult["query"]["results"]
                if singleResults is not None:
                    if "query" in result:
                        if "results" in result["query"]:
                            results = result["query"]["results"]
                            results.update(singleResults)
                        else:
                            result["query"]["results"] = singleResults
                    else:
                        result["query"] = {}
                        result["query"]["results"] = singleResults
        return result

    def query(self, askQuery: str, title: str = None, limit: int = None) -> dict:
        """
        run query and return list of Dicts

        Args:
            askQuery(string): the SMW inline query to be send via api
            title(string): the title (if any)
            limit(int): the maximum number of records to be retrieved (if any)

        Return:
            dict: mainlabel as key and value is a dict of the associated property values
        """
        rawresult = self.rawquery(askQuery, title, limit)
        resultDict = self.deserialize(rawresult)
        return resultDict

    def updateProgress(self, count):
        if self.showProgress:
            sep = "\n" if count % 80 == 0 else ""
            print(".", end=sep, flush=True)

ask(query, title=None, limit=None)

Ask a query against Semantic MediaWiki.

API doc: https://semantic-mediawiki.org/wiki/Ask_API

The query is devided into multiple subqueries if the results of the query exeed the defined threshold. If this happens the query is executed multiple times to retrieve all results without passing the threshold.

Parameters:

Name Type Description Default
query(str)

SMW ask query to be executed

required
title(str)

title of query (optional)

required
limit(int)

the maximum number of results to be returned - please note that SMW configuration will also limit results on the server side

required

Returns:

Type Description

Generator for retrieving all search results, with each answer as a dictionary.

If the query is invalid, an APIError is raised. A valid query with zero

results will not raise any error.

Examples:

>>> query = "[[Category:my cat]]|[[Has name::a name]]|?Has property"
>>> for answer in site.ask(query):
>>>     for title, data in answer.items()
>>>         print(title)
>>>         print(data)
Source code in wikibot3rd/smw.py
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
def ask(self, query: str, title: str = None, limit: int = None):
    """
    Ask a query against Semantic MediaWiki.

    API doc: https://semantic-mediawiki.org/wiki/Ask_API

    The query is devided into multiple subqueries if the results of the query exeed the defined threshold.
    If this happens the query is executed multiple times to retrieve all results without passing the threshold.


    Args:
        query(str): SMW ask query to be executed
        title(str): title of query (optional)
        limit(int): the maximum number of results to be returned -
            please note that SMW configuration will also limit results on the server side

    Returns:
        Generator for retrieving all search results, with each answer as a dictionary.
        If the query is invalid, an APIError is raised. A valid query with zero
        results will not raise any error.

    Examples:

        >>> query = "[[Category:my cat]]|[[Has name::a name]]|?Has property"
        >>> for answer in site.ask(query):
        >>>     for title, data in answer.items()
        >>>         print(title)
        >>>         print(data)
    """
    # kwargs = {}
    # if title is None:
    #    kwargs['title'] = title

    if limit is None:
        # if limit is not defined (via cmd-line), check if defined in query
        limit = SMW.getOuterMostArgumentValueOfQuery("limit", query)
    results = None
    if self.queryDivision == 1:
        try:
            results = self.askForAllResults(query, limit)
        except QueryResultSizeExceedException as e:
            print(e)
            results = e.getResults()
    else:
        results = self.askPartitionQuery(query, limit)
    for res in results:
        yield res

askForAllResults(query, limit=None, kwargs={})

Executes the query until all results are received of the given limit is reached. If the SMW results limit is reached before all results are retrieved the QueryResultSizeExceedException is raised. Args: query(string): the SMW inline query to be send via api limit(int): limit for the query results, None (default) for all results kwargs: Returns: query results Raises: QueryResultSizeExceedException: Raised if not all results can be retrieved

Source code in wikibot3rd/smw.py
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
def askForAllResults(self, query, limit=None, kwargs={}):
    """
    Executes the query until all results are received of the given limit is reached.
    If the SMW results limit is reached before all results are retrieved the QueryResultSizeExceedException is raised.
    Args:
        query(string): the SMW inline query to be send via api
        limit(int): limit for the query results, None (default) for all results
        kwargs:
    Returns:
        query results
    Raises:
        QueryResultSizeExceedException: Raised if not all results can be retrieved
    """
    endShowProgress = lambda showProgress, c: (
        print("\n" if not c % 80 == 0 else "") if showProgress else None
    )
    offset = 0
    done = False
    count = 0
    res = []
    while not done:
        count += 1
        if self.showProgress:
            sep = "\n" if count % 80 == 0 else ""
            print(".", end=sep, flush=True)

        queryParam = "{query}|offset={offset}".format(query=query, offset=offset)
        if limit is not None:
            queryParam += "|limit={limit}".format(limit=limit)
        # print(f"QueryPram: {queryParam}")   #debug purposes
        results = self.site.raw_api(
            "ask", query=queryParam, http_method="GET", **kwargs
        )
        self.site.handle_api_result(results)  # raises APIError on error
        continueOffset = results.get("query-continue-offset")
        if continueOffset is None:
            done = True
        else:
            if limit is not None and continueOffset >= limit:
                done = True
            elif (
                results.get("query") is not None
                and not results.get("query").get("results")
            ) or continueOffset < offset:
                # contine-offset is set but result is empty
                endShowProgress(self.showProgress, count)
                res.append(results)
                raise QueryResultSizeExceedException(result=res)
            if continueOffset < offset:
                done = True
        offset = continueOffset
        res.append(results)
    endShowProgress(self.showProgress, count)
    return res

askPartitionQuery(query, limit=None)

Splits the query into multiple subqueries by determining the 'modification date' interval in which all results lie. This interval is then divided into subintervals. The number of subintervals is defined by the user via commandline. The results of all subqueries are then returned. Args: query(string): the SMW inline query to be send via api limit(string): limit of the query Returns: All results of the given query.

Source code in wikibot3rd/smw.py
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
def askPartitionQuery(self, query, limit=None):
    """
    Splits the query into multiple subqueries by determining the 'modification date' interval in which all results
    lie. This interval is then divided into subintervals. The number of subintervals is defined by the user via
    commandline. The results of all subqueries are then returned.
    Args:
        query(string): the SMW inline query to be send via api
        limit(string): limit of the query
    Returns:
        All results of the given query.
    """
    (start, end) = self.getBoundariesOfQuery(query)
    results = []
    if start is None or end is None:
        return results
    if self.showProgress:
        print(f"Start: {start}, End: {end}", file=sys.stderr, flush=True)
    numIntervals = self.queryDivision
    calcIntervalBound = lambda start, n: (start + n * lenSubinterval).replace(
        microsecond=0
    )
    calcLimit = lambda limit, numRes: None if limit is None else limit - numResults
    done = False
    numResults = 0
    while not done:
        lenSubinterval = (end - start) / numIntervals
        for n in range(numIntervals):
            if self.showProgress:
                print(f"Query {n+1}/{numIntervals}:")
            tempLowerBound = calcIntervalBound(start, n)
            tempUpperBound = (
                calcIntervalBound(start, n + 1) if (n + 1 < numIntervals) else end
            )
            queryBounds = self.splitClause.queryBounds(
                tempLowerBound, tempUpperBound
            )
            queryParam = f"{query}|{queryBounds}"
            try:
                tempRes = self.askForAllResults(
                    queryParam, calcLimit(limit, numResults)
                )
                if tempRes is not None:
                    for res in tempRes:
                        results.append(res)
                        query_field = res.get("query")
                        if query_field is not None:
                            meta_field = query_field.get("meta")
                            if meta_field is not None:
                                numResults += int(meta_field.get("count"))
            except QueryResultSizeExceedException as e:
                # Too many results for current subinterval n -> print error and return the results upto this point
                print(e)
                if e.getResults():
                    for res in e.getResults():
                        results.append(res)
                numResults = 0
                break
            if limit is not None and limit <= numResults:
                if self.showProgress:
                    print(f"Defined limit of {limit} reached - ending querying")
                done = True
                break
        done = True
    return results

getBoundariesOfQuery(query)

Retrieves the time interval, lower and upper bound, based on the modification date in which the results of the given query lie. Args: query(string): the SMW inline query to be send via api Returns: (Datetime, Datetime): Returns the time interval (based on modification date) in which all results of the query lie potentially the start end end might be None if an error occured or the input is invalid

Source code in wikibot3rd/smw.py
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
def getBoundariesOfQuery(self, query):
    """
    Retrieves the time interval, lower and upper bound, based on the modification date in which the results of the
    given query lie.
    Args:
        query(string): the SMW inline query to be send via api
    Returns:
        (Datetime, Datetime): Returns the time interval (based on modification date) in which all results of the
        query lie potentially the start end end might be None if an error occured or the input is invalid
    """
    first = self.splitClause.getFirst()
    queryparam = f"{query}|{first}"
    start = self.getTimeStampBoundary(queryparam, "asc")
    end = self.getTimeStampBoundary(queryparam, "desc")
    return (start, end)

getTimeStampBoundary(queryparam, order)

query according to a DATE e.g. MODIFICATION_DATE in the given order

Args:

Source code in wikibot3rd/smw.py
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
def getTimeStampBoundary(self, queryparam, order):
    """
    query according to a DATE e.g. MODIFICATION_DATE in the given order

    Args:

    """
    queryparamBoundary = f"{queryparam}|order={order}"
    resultsBoundary = self.site.raw_api(
        "ask", query=queryparamBoundary, http_method="GET"
    )
    self.site.handle_api_result(resultsBoundary)
    deserializedResult = self.deserialize(resultsBoundary)

    deserializedValues = deserializedResult.values()
    date = self.splitClause.deserialize(deserializedValues)
    return date

info()

see https://www.semantic-mediawiki.org/wiki/Help:API:smwinfo

Source code in wikibot3rd/smw.py
341
342
343
344
345
346
def info(self):
    """see https://www.semantic-mediawiki.org/wiki/Help:API:smwinfo"""
    results = self.site.raw_api("smwinfo", http_method="GET")
    self.site.handle_api_result(results)  # raises APIError on error

    return results

query(askQuery, title=None, limit=None)

run query and return list of Dicts

Parameters:

Name Type Description Default
askQuery(string)

the SMW inline query to be send via api

required
title(string)

the title (if any)

required
limit(int)

the maximum number of records to be retrieved (if any)

required
Return

dict: mainlabel as key and value is a dict of the associated property values

Source code in wikibot3rd/smw.py
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
def query(self, askQuery: str, title: str = None, limit: int = None) -> dict:
    """
    run query and return list of Dicts

    Args:
        askQuery(string): the SMW inline query to be send via api
        title(string): the title (if any)
        limit(int): the maximum number of records to be retrieved (if any)

    Return:
        dict: mainlabel as key and value is a dict of the associated property values
    """
    rawresult = self.rawquery(askQuery, title, limit)
    resultDict = self.deserialize(rawresult)
    return resultDict

rawquery(askQuery, title=None, limit=None)

run the given askQuery and return the raw result

Parameters:

Name Type Description Default
askQuery(string)

the SMW inline query to be send via api

required
title(string)

the title (if any)

required
limit(int)

the maximum number of records to be retrieved (if any)

required

Returns:

Name Type Description
dict

the raw query result as returned by the ask API

Source code in wikibot3rd/smw.py
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
def rawquery(self, askQuery, title=None, limit=None):
    """
    run the given askQuery and return the raw result

    Args:
        askQuery(string): the SMW inline query to be send via api
        title(string): the title (if any)
        limit(int): the maximum number of records to be retrieved (if any)

    Returns:
        dict: the raw query result as returned by the ask API
    """
    fixedAsk = self.fixAsk(askQuery)
    result = None
    for singleResult in self.ask(fixedAsk, title, limit):
        if result is None:
            result = singleResult
        else:
            singleResults = None
            if "query" in singleResult:
                if "results" in singleResult["query"]:
                    singleResults = singleResult["query"]["results"]
            if singleResults is not None:
                if "query" in result:
                    if "results" in result["query"]:
                        results = result["query"]["results"]
                        results.update(singleResults)
                    else:
                        result["query"]["results"] = singleResults
                else:
                    result["query"] = {}
                    result["query"]["results"] = singleResults
    return result

SplitClause

Query parameter to be used for splitting e.g. Modification date, Creation Date, could be potentially any parameter that is ordered and countable Currently we assume a parameter of type Date and use Modification date by default

Source code in wikibot3rd/smw.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
class SplitClause:
    """
    Query parameter to be used for splitting e.g. Modification date, Creation Date, could be potentially
    any parameter that is ordered and countable
    Currently we assume a parameter of type Date and use Modification date by default
    """

    def __init__(self, name="Modification date", label="mdate"):
        """
        construct me
        """
        self.name = name
        self.label = label

    def queryBounds(self, lowerBound, upperBound) -> str:
        """
        get the query bounds
        e.g. [[Modification date::>=2021-01-01T12:00

        Args:
            lowerBound(datetime): start of the time interval
            upperBound(datetime): end of the time interval

        Returns:
            Returns the SMW ask part for the boundary
        """
        result = f"[[{self.name}:: >={lowerBound.isoformat()}]]|[[{self.name}:: <={upperBound.isoformat()}]]"
        return result

    def getFirst(self) -> str:
        """
        get the first element
        """
        result = f"?{self.name}={self.label}|sort={self.name}|limit=1"
        return result

    def deserialize(self, values):
        """
        deserialize my query result
        """
        # dict: {
        #    'Property:Foaf:knows': {
        #       '': 'Property:Foaf:knows',
        #       'Modification date': datetime.datetime(2020, 11, 28, 17, 40, 36)
        #    }
        # }
        date = None
        vlist = list(values)
        if len(vlist) > 0:
            innerValue = vlist[0]
            if self.label in innerValue:
                date = innerValue[self.label]
        return date

__init__(name='Modification date', label='mdate')

construct me

Source code in wikibot3rd/smw.py
145
146
147
148
149
150
def __init__(self, name="Modification date", label="mdate"):
    """
    construct me
    """
    self.name = name
    self.label = label

deserialize(values)

deserialize my query result

Source code in wikibot3rd/smw.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
def deserialize(self, values):
    """
    deserialize my query result
    """
    # dict: {
    #    'Property:Foaf:knows': {
    #       '': 'Property:Foaf:knows',
    #       'Modification date': datetime.datetime(2020, 11, 28, 17, 40, 36)
    #    }
    # }
    date = None
    vlist = list(values)
    if len(vlist) > 0:
        innerValue = vlist[0]
        if self.label in innerValue:
            date = innerValue[self.label]
    return date

getFirst()

get the first element

Source code in wikibot3rd/smw.py
167
168
169
170
171
172
def getFirst(self) -> str:
    """
    get the first element
    """
    result = f"?{self.name}={self.label}|sort={self.name}|limit=1"
    return result

queryBounds(lowerBound, upperBound)

get the query bounds e.g. [[Modification date::>=2021-01-01T12:00

Parameters:

Name Type Description Default
lowerBound(datetime)

start of the time interval

required
upperBound(datetime)

end of the time interval

required

Returns:

Type Description
str

Returns the SMW ask part for the boundary

Source code in wikibot3rd/smw.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def queryBounds(self, lowerBound, upperBound) -> str:
    """
    get the query bounds
    e.g. [[Modification date::>=2021-01-01T12:00

    Args:
        lowerBound(datetime): start of the time interval
        upperBound(datetime): end of the time interval

    Returns:
        Returns the SMW ask part for the boundary
    """
    result = f"[[{self.name}:: >={lowerBound.isoformat()}]]|[[{self.name}:: <={upperBound.isoformat()}]]"
    return result

sso

Created on 2024-01-22

@author: wf

with ChatGPT-4 prompting

SSO

A class to implement MediaWiki single sign-on support.

This class provides functionality to connect to a MediaWiki database, verify user credentials, and handle database connections with pooling.

Attributes:

Name Type Description
host str

The host of the MediaWiki database.

database str

The name of the MediaWiki database.

sql_port int

The SQL port for the database connection.

db_username Optional[str]

The database username.

db_password Optional[str]

The database password.

with_pool bool

Flag to determine if connection pooling is used.

timeout float

The timeout for checking SQL port availability.

debug Optional[bool]

Flag to enable debug mode.

Source code in wikibot3rd/sso.py
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
class SSO:
    """
    A class to implement MediaWiki single sign-on support.

    This class provides functionality to connect to a MediaWiki database,
    verify user credentials, and handle database connections with pooling.

    Attributes:
        host (str): The host of the MediaWiki database.
        database (str): The name of the MediaWiki database.
        sql_port (int): The SQL port for the database connection.
        db_username (Optional[str]): The database username.
        db_password (Optional[str]): The database password.
        with_pool (bool): Flag to determine if connection pooling is used.
        timeout (float): The timeout for checking SQL port availability.
        debug (Optional[bool]): Flag to enable debug mode.
    """

    def __init__(
        self,
        host: str,
        database: str,
        sql_port: int = 3306,
        db_username: Optional[str] = None,
        db_password: Optional[str] = None,
        with_pool: bool = True,
        timeout: float = 3,
        debug: Optional[bool] = False,
    ):
        """
        Constructs all the necessary attributes for the SSO object.

        Args:
            host (str): The host of the MediaWiki database.
            database (str): The name of the MediaWiki database.
            sql_port (int): The SQL port for the database connection.
            db_username (Optional[str]): The database username.
            db_password (Optional[str]): The database password.
            with_pool (bool): Flag to determine if connection pooling is used.
            timeout (float): The timeout for checking SQL port availability.
            debug (Optional[bool]): Flag to enable debug mode.
        """
        self.host = host
        self.database = database
        self.sql_port = sql_port
        self.timeout = timeout
        self.db_username = db_username
        self.db_password = db_password
        self.debug = debug
        self.pool = self.get_pool() if with_pool else None

    def get_pool(self) -> pooling.MySQLConnectionPool:
        """
        Creates a connection pool for the database.

        Returns:
            MySQLConnectionPool: A pool of database connections.
        """
        pool_config = {
            "pool_name": "mypool",
            "pool_size": 2,
            "host": self.host,
            "user": self.db_username,
            "password": self.db_password,
            "database": self.database,
            "raise_on_warnings": True,
        }
        return pooling.MySQLConnectionPool(**pool_config)

    def check_port(self) -> bool:
        """
        Checks if the specified SQL port is accessible on the configured host.

        Returns:
            bool: True if the port is accessible, False otherwise.
        """
        try:
            with socket.create_connection(
                (self.host, self.sql_port), timeout=self.timeout
            ):
                return True
        except socket.error as ex:
            if self.debug:
                print(f"Connection to {self.host} port {self.sql_port} failed: {ex}")
                traceback.print_exc()
            return False

    def verify_password(self, password: str, hash_value: str) -> bool:
        """
        Verifies a password against a stored hash value.

        Args:
            password (str): The password to verify.
            hash_value (str): The stored hash value to compare against.

        Returns:
            bool: True if the password matches the hash value, False otherwise.
        """
        parts = hash_value.split(":")
        if len(parts) != 7:
            raise ValueError("Invalid hash format")

        (
            _,
            pbkdf2_indicator,
            hash_algorithm,
            iterations,
            _,
            salt,
            hashed_password,
        ) = parts

        if pbkdf2_indicator != "pbkdf2":
            raise ValueError("verify_password expects pbkdf2 hashes")

        iterations = int(iterations)

        def fix_base64_padding(string: str) -> str:
            return string + "=" * (-len(string) % 4)

        salt = fix_base64_padding(salt)
        hashed_password = fix_base64_padding(hashed_password)

        salt = base64.b64decode(salt)
        hashed_password = base64.b64decode(hashed_password)

        if hash_algorithm not in hashlib.algorithms_available:
            raise ValueError(f"Unsupported hash algorithm: {hash_algorithm}")

        new_hash = hashlib.pbkdf2_hmac(
            hash_algorithm, password.encode("utf-8"), salt, iterations
        )
        return new_hash == hashed_password

    def get_user(self, username: str) -> Optional[User]:
        """
        Retrieve details of a user by username.
        Returns a User object if found, otherwise None.
        """
        mw_username = username[0].upper() + username[1:]
        user_record = self.fetch_user_data_from_database(mw_username)
        if user_record:
            return User(
                id=user_record["user_id"],
                name=user_record["user_name"],
                real_name=user_record["user_real_name"],
                password=user_record["user_password"],
                email=user_record["user_email"],
                touched=user_record["user_touched"],
                editcount=user_record["user_editcount"],
                is_admin=user_record["is_sysop"] > 0,
            )
        return None

    def query(self, connection, sql_query, params) -> Dict:
        cursor = connection.cursor(dictionary=True)
        cursor.execute(sql_query, params)
        record = cursor.fetchone()
        cursor.close()
        return record

    def fetch_user_data_from_database(self, mw_username: str) -> Optional[dict]:
        """
        Fetch user data from the database.

        Args:
            mw_username(str): the Mediawiki username
        """
        user_record = None
        try:
            connection = (
                self.pool.get_connection()
                if self.pool
                else mysql.connector.connect(
                    host=self.host,
                    user=self.db_username,
                    password=self.db_password,
                    database=self.database,
                )
            )
            # JOIN query: Fetch user data and sysop status
            sql_query = """
            SELECT u.*, 
                   COUNT(ug.ug_group) AS is_sysop
            FROM `user` u
            LEFT JOIN `user_groups` ug ON u.user_id = ug.ug_user AND ug.ug_group = 'sysop'
            WHERE u.user_name = %s
            GROUP BY u.user_id
            """
            user_record = self.query(connection, sql_query, (mw_username,))
        except Exception as ex:
            if self.debug:
                print(f"Database error: {ex}")
                traceback.print_exc()
        finally:
            if connection and connection.is_connected():
                connection.close()
        return user_record

    def check_credentials(self, username: str, password: str) -> bool:
        """
        Checks the validity of MediaWiki username and password.

        Args:
            username (str): The MediaWiki username.
            password (str): The password to verify.

        Returns:
            bool: True if the credentials are valid, False otherwise.
        """
        is_valid = False
        user = self.get_user(username)
        if user:
            stored_hash = user.password
            is_valid = self.verify_password(password, stored_hash)
        elif self.debug:
            print(f"Username {username} not found in {self.database} on {self.host}")
        return is_valid

__init__(host, database, sql_port=3306, db_username=None, db_password=None, with_pool=True, timeout=3, debug=False)

Constructs all the necessary attributes for the SSO object.

Parameters:

Name Type Description Default
host str

The host of the MediaWiki database.

required
database str

The name of the MediaWiki database.

required
sql_port int

The SQL port for the database connection.

3306
db_username Optional[str]

The database username.

None
db_password Optional[str]

The database password.

None
with_pool bool

Flag to determine if connection pooling is used.

True
timeout float

The timeout for checking SQL port availability.

3
debug Optional[bool]

Flag to enable debug mode.

False
Source code in wikibot3rd/sso.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def __init__(
    self,
    host: str,
    database: str,
    sql_port: int = 3306,
    db_username: Optional[str] = None,
    db_password: Optional[str] = None,
    with_pool: bool = True,
    timeout: float = 3,
    debug: Optional[bool] = False,
):
    """
    Constructs all the necessary attributes for the SSO object.

    Args:
        host (str): The host of the MediaWiki database.
        database (str): The name of the MediaWiki database.
        sql_port (int): The SQL port for the database connection.
        db_username (Optional[str]): The database username.
        db_password (Optional[str]): The database password.
        with_pool (bool): Flag to determine if connection pooling is used.
        timeout (float): The timeout for checking SQL port availability.
        debug (Optional[bool]): Flag to enable debug mode.
    """
    self.host = host
    self.database = database
    self.sql_port = sql_port
    self.timeout = timeout
    self.db_username = db_username
    self.db_password = db_password
    self.debug = debug
    self.pool = self.get_pool() if with_pool else None

check_credentials(username, password)

Checks the validity of MediaWiki username and password.

Parameters:

Name Type Description Default
username str

The MediaWiki username.

required
password str

The password to verify.

required

Returns:

Name Type Description
bool bool

True if the credentials are valid, False otherwise.

Source code in wikibot3rd/sso.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def check_credentials(self, username: str, password: str) -> bool:
    """
    Checks the validity of MediaWiki username and password.

    Args:
        username (str): The MediaWiki username.
        password (str): The password to verify.

    Returns:
        bool: True if the credentials are valid, False otherwise.
    """
    is_valid = False
    user = self.get_user(username)
    if user:
        stored_hash = user.password
        is_valid = self.verify_password(password, stored_hash)
    elif self.debug:
        print(f"Username {username} not found in {self.database} on {self.host}")
    return is_valid

check_port()

Checks if the specified SQL port is accessible on the configured host.

Returns:

Name Type Description
bool bool

True if the port is accessible, False otherwise.

Source code in wikibot3rd/sso.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def check_port(self) -> bool:
    """
    Checks if the specified SQL port is accessible on the configured host.

    Returns:
        bool: True if the port is accessible, False otherwise.
    """
    try:
        with socket.create_connection(
            (self.host, self.sql_port), timeout=self.timeout
        ):
            return True
    except socket.error as ex:
        if self.debug:
            print(f"Connection to {self.host} port {self.sql_port} failed: {ex}")
            traceback.print_exc()
        return False

fetch_user_data_from_database(mw_username)

Fetch user data from the database.

Parameters:

Name Type Description Default
mw_username(str)

the Mediawiki username

required
Source code in wikibot3rd/sso.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
def fetch_user_data_from_database(self, mw_username: str) -> Optional[dict]:
    """
    Fetch user data from the database.

    Args:
        mw_username(str): the Mediawiki username
    """
    user_record = None
    try:
        connection = (
            self.pool.get_connection()
            if self.pool
            else mysql.connector.connect(
                host=self.host,
                user=self.db_username,
                password=self.db_password,
                database=self.database,
            )
        )
        # JOIN query: Fetch user data and sysop status
        sql_query = """
        SELECT u.*, 
               COUNT(ug.ug_group) AS is_sysop
        FROM `user` u
        LEFT JOIN `user_groups` ug ON u.user_id = ug.ug_user AND ug.ug_group = 'sysop'
        WHERE u.user_name = %s
        GROUP BY u.user_id
        """
        user_record = self.query(connection, sql_query, (mw_username,))
    except Exception as ex:
        if self.debug:
            print(f"Database error: {ex}")
            traceback.print_exc()
    finally:
        if connection and connection.is_connected():
            connection.close()
    return user_record

get_pool()

Creates a connection pool for the database.

Returns:

Name Type Description
MySQLConnectionPool MySQLConnectionPool

A pool of database connections.

Source code in wikibot3rd/sso.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def get_pool(self) -> pooling.MySQLConnectionPool:
    """
    Creates a connection pool for the database.

    Returns:
        MySQLConnectionPool: A pool of database connections.
    """
    pool_config = {
        "pool_name": "mypool",
        "pool_size": 2,
        "host": self.host,
        "user": self.db_username,
        "password": self.db_password,
        "database": self.database,
        "raise_on_warnings": True,
    }
    return pooling.MySQLConnectionPool(**pool_config)

get_user(username)

Retrieve details of a user by username. Returns a User object if found, otherwise None.

Source code in wikibot3rd/sso.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def get_user(self, username: str) -> Optional[User]:
    """
    Retrieve details of a user by username.
    Returns a User object if found, otherwise None.
    """
    mw_username = username[0].upper() + username[1:]
    user_record = self.fetch_user_data_from_database(mw_username)
    if user_record:
        return User(
            id=user_record["user_id"],
            name=user_record["user_name"],
            real_name=user_record["user_real_name"],
            password=user_record["user_password"],
            email=user_record["user_email"],
            touched=user_record["user_touched"],
            editcount=user_record["user_editcount"],
            is_admin=user_record["is_sysop"] > 0,
        )
    return None

verify_password(password, hash_value)

Verifies a password against a stored hash value.

Parameters:

Name Type Description Default
password str

The password to verify.

required
hash_value str

The stored hash value to compare against.

required

Returns:

Name Type Description
bool bool

True if the password matches the hash value, False otherwise.

Source code in wikibot3rd/sso.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def verify_password(self, password: str, hash_value: str) -> bool:
    """
    Verifies a password against a stored hash value.

    Args:
        password (str): The password to verify.
        hash_value (str): The stored hash value to compare against.

    Returns:
        bool: True if the password matches the hash value, False otherwise.
    """
    parts = hash_value.split(":")
    if len(parts) != 7:
        raise ValueError("Invalid hash format")

    (
        _,
        pbkdf2_indicator,
        hash_algorithm,
        iterations,
        _,
        salt,
        hashed_password,
    ) = parts

    if pbkdf2_indicator != "pbkdf2":
        raise ValueError("verify_password expects pbkdf2 hashes")

    iterations = int(iterations)

    def fix_base64_padding(string: str) -> str:
        return string + "=" * (-len(string) % 4)

    salt = fix_base64_padding(salt)
    hashed_password = fix_base64_padding(hashed_password)

    salt = base64.b64decode(salt)
    hashed_password = base64.b64decode(hashed_password)

    if hash_algorithm not in hashlib.algorithms_available:
        raise ValueError(f"Unsupported hash algorithm: {hash_algorithm}")

    new_hash = hashlib.pbkdf2_hmac(
        hash_algorithm, password.encode("utf-8"), salt, iterations
    )
    return new_hash == hashed_password

User

Mediawiki user details

Source code in wikibot3rd/sso.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@lod_storable
class User:
    """
    Mediawiki user details
    """

    id: int
    name: str
    real_name: str
    password: str  # hashed password
    email: str
    touched: str
    editcount: int
    is_admin: Optional[bool] = False

    def __post_init__(self):
        # Safely convert binary fields to strings if they are not already
        if isinstance(self.password, bytes):
            self.password = self.password.decode("utf-8", errors="replace")
        if isinstance(self.touched, bytes):
            self.touched = self.touched.decode("utf-8", errors="replace")

version

Created on 2022-03-24

@author: wf

Version

Bases: object

Version handling for py-3rdparty-mediawiki

Source code in wikibot3rd/version.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class Version(object):
    """
    Version handling for py-3rdparty-mediawiki
    """

    name = "py-3rdparty-mediawiki"
    version = wikibot3rd.__version__
    date = "2020-10-31"
    updated = "2024-07-27"

    authors = "Wolfgang Fahl, Tim Holzheim"

    description = "Wrapper for mwclient with improvements for 3rd party wikis"

    cm_url = "https://github.com/WolfgangFahl/py-3rdparty-mediawiki"
    chat_url = "https://github.com/WolfgangFahl/py-3rdparty-mediawiki/discussions"
    doc_url = "https://wiki.bitplan.com/index.php/Py-3rdparty-mediawiki"

    license = f"""Copyright 2020-2024 contributors. All rights reserved.
  Licensed under the Apache License 2.0
  http://www.apache.org/licenses/LICENSE-2.0
  Distributed on an "AS IS" basis without warranties
  or conditions of any kind, either express or implied."""
    longDescription = f"""{name} version {version}
{description}
  Created by {authors} on {date} last updated {updated}"""

wiki

Created on 2020-11-05

@author: wf

Wiki

Bases: object

common interface for WikiBot and WikiClient

Source code in wikibot3rd/wiki.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class Wiki(object):
    """
    common interface for WikiBot and WikiClient
    """

    def __init__(self, wikiUser, debug=False):
        """
        Constructor

        Args:
            wikiUser(WikiUser): the wiki user to initialize me for
        """
        self.wikiUser = wikiUser
        self.debug = debug

    def __str__(self):
        """
        return a string representation of myself
        """
        wu = self.wikiUser
        botType = type(self).__name__
        text = "%20s(%10s): %15s %s" % (wu.wikiId, botType, wu.user, wu.url)
        return text

__init__(wikiUser, debug=False)

Constructor

Parameters:

Name Type Description Default
wikiUser(WikiUser)

the wiki user to initialize me for

required
Source code in wikibot3rd/wiki.py
13
14
15
16
17
18
19
20
21
def __init__(self, wikiUser, debug=False):
    """
    Constructor

    Args:
        wikiUser(WikiUser): the wiki user to initialize me for
    """
    self.wikiUser = wikiUser
    self.debug = debug

__str__()

return a string representation of myself

Source code in wikibot3rd/wiki.py
23
24
25
26
27
28
29
30
def __str__(self):
    """
    return a string representation of myself
    """
    wu = self.wikiUser
    botType = type(self).__name__
    text = "%20s(%10s): %15s %s" % (wu.wikiId, botType, wu.user, wu.url)
    return text

wikiaction

Created on 02.02.2021

@author: wf

WikiAction

Bases: object

perform an action on the given semantic media wiki

Source code in wikibot3rd/wikiaction.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class WikiAction(object):
    """
    perform an action on the given semantic media wiki
    """

    def __init__(self, smw, debug=False):
        """
        Constructor
        """
        self.smw = smw
        self.debug = debug
        self.sourceCodeResult = self.getSourceCodes()

    def getSourceCodes(self):
        ask = """{{#ask: [[Concept:Sourcecode]]
|mainlabel=Sourcecode
| ?Sourcecode id = id
| ?Sourcecode lang = lang
| ?Sourcecode text = text
| ?Sourcecode author = author
| ?Sourcecode since = since
| ?Sourcecode url = url
}}"""
        result = self.smw.query(ask)
        return result

    def getLambdaAction(self, name, queryName, actionName):
        """
        get an action from the result with the given query and actionName
        """
        if self.debug:
            print(
                "lambdaAction with query: %s and action: %s" % (queryName, actionName)
            )
        qCode = self.sourceCodeResult[queryName]
        query = Query(name=qCode["id"], query=qCode["text"], lang=qCode["lang"])
        sCode = self.sourceCodeResult[actionName]
        code = Code(name=sCode["id"], text=sCode["text"], lang=sCode["lang"])
        action = LambdaAction(name, query=query, code=code)
        return action

__init__(smw, debug=False)

Constructor

Source code in wikibot3rd/wikiaction.py
17
18
19
20
21
22
23
def __init__(self, smw, debug=False):
    """
    Constructor
    """
    self.smw = smw
    self.debug = debug
    self.sourceCodeResult = self.getSourceCodes()

getLambdaAction(name, queryName, actionName)

get an action from the result with the given query and actionName

Source code in wikibot3rd/wikiaction.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def getLambdaAction(self, name, queryName, actionName):
    """
    get an action from the result with the given query and actionName
    """
    if self.debug:
        print(
            "lambdaAction with query: %s and action: %s" % (queryName, actionName)
        )
    qCode = self.sourceCodeResult[queryName]
    query = Query(name=qCode["id"], query=qCode["text"], lang=qCode["lang"])
    sCode = self.sourceCodeResult[actionName]
    code = Code(name=sCode["id"], text=sCode["text"], lang=sCode["lang"])
    action = LambdaAction(name, query=query, code=code)
    return action

wikibackup

Created on 2020-12-05

@author: wf

wikibot

Created on 2020-03-24

@author: wf

WikiBot

Bases: Wiki

WikiBot

Source code in wikibot3rd/wikibot.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
class WikiBot(Wiki):
    """
    WikiBot
    """

    @staticmethod
    def getBots(limit=None, name=None, valueExpr=None):
        bots = {}
        wikiUsers = WikiUser.getWikiUsers().values()
        for wikiUser in wikiUsers:
            selected = True
            if name is not None:
                value = wikiUser.__dict__[name]
                found = re.search(valueExpr, value)
                selected = found is not None
            if selected:
                wikibot = WikiBot(wikiUser)
                bots[wikiUser.wikiId] = wikibot
                if limit is not None and len(bots) >= limit:
                    break
        return bots

    @staticmethod
    def ofWikiId(wikiId, lenient=True, debug=False):
        wikiUser = WikiUser.ofWikiId(wikiId, lenient=lenient)
        wikibot = WikiBot(wikiUser, debug=debug)
        return wikibot

    @staticmethod
    def ofWikiUser(wikiUser):
        wikibot = WikiBot(wikiUser)
        return wikibot

    def __init__(
        self, wikiUser, debug: bool = False, withLogin: bool = False, maxRetries=2
    ):
        """
        Constructor

        Args:
            wikiUser(WikiUser): the wiki user to initialize me for
            debug(bool): True if debug mode should be on
            withLogin(bool): True if init should automatically login
        """
        pywikibot.config.max_retries = maxRetries
        super(WikiBot, self).__init__(wikiUser, debug)
        self.family = wikiUser.wikiId.replace("-", "").replace("_", "")
        self.url = wikiUser.url.replace("\\:", ":")
        if not self.url:
            raise Exception("url is missing for %s" % wikiUser.wikiId)

        self.scriptPath = wikiUser.scriptPath
        self.version = wikiUser.version
        o = urlparse(self.url)
        self.scheme = o.scheme
        self.netloc = o.netloc
        self.scriptPath = o.path + self.scriptPath
        self.checkFamily()
        if withLogin:
            self.login()

    def register_family_file(self, familyName: str, famfile: str):
        """
        register the family file

        Args:
            family(str): the familyName to register
            famfile(str): the path to the family file
        """
        # deprecated code
        # config2.register_family_file(familyName, famfile)
        pywikibot.config.family_files[familyName] = famfile

    def checkFamily(self):
        """
        check if a valid family file exists and if not create it

        watch out for https://stackoverflow.com/questions/76612838/how-to-work-with-custom-wikibase-using-pywikibot
        8.2 changes that might break old family files
        """
        iniFile = WikiUser.iniFilePath(self.wikiUser.wikiId)
        famfile = iniFile.replace(".ini", ".py")
        if not isfile(famfile):
            print("creating family file %s" % famfile)
            template = """# -*- coding: utf-8  -*-
from pywikibot import family

class Family(family.Family):
    name = '%s'
    langs = {
        'en': '%s',
    }
    def scriptpath(self, code):
       return '%s'

    def isPublic(self):
        return %s   

    def version(self, code):
        return "%s"  # The MediaWiki version used. Very important in most cases. (contrary to documentation)   

    def protocol(self, code):
       return '%s'
"""
            mw_version = self.wikiUser.version.lower().replace("mediawiki ", "")
            ispublic = "False" if self.wikiUser.user is not None else "True"
            code = template % (
                self.family,
                self.netloc,
                self.scriptPath,
                ispublic,
                mw_version,
                self.scheme,
            )
            with open(famfile, "w") as py_file:
                py_file.write(code)
        self.register_family_file(self.family, famfile)
        if self.wikiUser.user:
            pywikibot.config.usernames[self.family]["en"] = self.wikiUser.user
        # config2.authenticate[self.netloc] = (self.user,self.getPassword())
        self.site = pywikibot.Site("en", self.family)

    def login(self):
        if self.wikiUser.user:
            # needs patch as outlined in https://phabricator.wikimedia.org/T248471
            # self.site.login(password=self.wikiUser.getPassword())
            lm = pywikibot.login.ClientLoginManager(
                password=self.wikiUser.getPassword(),
                site=self.site,
                user=self.wikiUser.user,
            )
            lm.login()
        else:
            raise Exception("wikiUser is not set")

    def getWikiMarkup(self, pageTitle):
        """
        get the wiki markup code (text) for the given page Title

        Args:
            pageTitle(str): the title of the page to retrieve

        Returns:
            str: the wiki markup code for the page
        """
        page = self.getPage(pageTitle)
        markup = page.text
        return markup

    def getHtml(self, pageTitle):
        """
        get the HTML code for the given page Title

        Args:
            pageTitle(str): the title of the page to retrieve

        Returns:
            str: the rendered HTML code for the page
        """
        page = self.getPage(pageTitle)
        html = page.get_parsed_page()
        return html

    def getPage(self, pageTitle):
        """get the page with the given title
        Args:
            pageTitle(str): the title of the page to retrieve
        Returns:
            Page: the wikibot3rd page for the given pageTitle
        """
        page = pywikibot.Page(self.site, pageTitle)
        return page

    def savePage(self, pageTitle, pageContent, pageSummary):
        """save a page with the given pageTitle, pageContent and pageSummary"""
        newPage = self.getPage(pageTitle)
        newPage.text = pageContent
        newPage.save(pageSummary)

__init__(wikiUser, debug=False, withLogin=False, maxRetries=2)

Constructor

Parameters:

Name Type Description Default
wikiUser(WikiUser)

the wiki user to initialize me for

required
debug(bool)

True if debug mode should be on

required
withLogin(bool)

True if init should automatically login

required
Source code in wikibot3rd/wikibot.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def __init__(
    self, wikiUser, debug: bool = False, withLogin: bool = False, maxRetries=2
):
    """
    Constructor

    Args:
        wikiUser(WikiUser): the wiki user to initialize me for
        debug(bool): True if debug mode should be on
        withLogin(bool): True if init should automatically login
    """
    pywikibot.config.max_retries = maxRetries
    super(WikiBot, self).__init__(wikiUser, debug)
    self.family = wikiUser.wikiId.replace("-", "").replace("_", "")
    self.url = wikiUser.url.replace("\\:", ":")
    if not self.url:
        raise Exception("url is missing for %s" % wikiUser.wikiId)

    self.scriptPath = wikiUser.scriptPath
    self.version = wikiUser.version
    o = urlparse(self.url)
    self.scheme = o.scheme
    self.netloc = o.netloc
    self.scriptPath = o.path + self.scriptPath
    self.checkFamily()
    if withLogin:
        self.login()

checkFamily()

check if a valid family file exists and if not create it

watch out for https://stackoverflow.com/questions/76612838/how-to-work-with-custom-wikibase-using-pywikibot 8.2 changes that might break old family files

Source code in wikibot3rd/wikibot.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
    def checkFamily(self):
        """
        check if a valid family file exists and if not create it

        watch out for https://stackoverflow.com/questions/76612838/how-to-work-with-custom-wikibase-using-pywikibot
        8.2 changes that might break old family files
        """
        iniFile = WikiUser.iniFilePath(self.wikiUser.wikiId)
        famfile = iniFile.replace(".ini", ".py")
        if not isfile(famfile):
            print("creating family file %s" % famfile)
            template = """# -*- coding: utf-8  -*-
from pywikibot import family

class Family(family.Family):
    name = '%s'
    langs = {
        'en': '%s',
    }
    def scriptpath(self, code):
       return '%s'

    def isPublic(self):
        return %s   

    def version(self, code):
        return "%s"  # The MediaWiki version used. Very important in most cases. (contrary to documentation)   

    def protocol(self, code):
       return '%s'
"""
            mw_version = self.wikiUser.version.lower().replace("mediawiki ", "")
            ispublic = "False" if self.wikiUser.user is not None else "True"
            code = template % (
                self.family,
                self.netloc,
                self.scriptPath,
                ispublic,
                mw_version,
                self.scheme,
            )
            with open(famfile, "w") as py_file:
                py_file.write(code)
        self.register_family_file(self.family, famfile)
        if self.wikiUser.user:
            pywikibot.config.usernames[self.family]["en"] = self.wikiUser.user
        # config2.authenticate[self.netloc] = (self.user,self.getPassword())
        self.site = pywikibot.Site("en", self.family)

getHtml(pageTitle)

get the HTML code for the given page Title

Parameters:

Name Type Description Default
pageTitle(str)

the title of the page to retrieve

required

Returns:

Name Type Description
str

the rendered HTML code for the page

Source code in wikibot3rd/wikibot.py
173
174
175
176
177
178
179
180
181
182
183
184
185
def getHtml(self, pageTitle):
    """
    get the HTML code for the given page Title

    Args:
        pageTitle(str): the title of the page to retrieve

    Returns:
        str: the rendered HTML code for the page
    """
    page = self.getPage(pageTitle)
    html = page.get_parsed_page()
    return html

getPage(pageTitle)

get the page with the given title Args: pageTitle(str): the title of the page to retrieve Returns: Page: the wikibot3rd page for the given pageTitle

Source code in wikibot3rd/wikibot.py
187
188
189
190
191
192
193
194
195
def getPage(self, pageTitle):
    """get the page with the given title
    Args:
        pageTitle(str): the title of the page to retrieve
    Returns:
        Page: the wikibot3rd page for the given pageTitle
    """
    page = pywikibot.Page(self.site, pageTitle)
    return page

getWikiMarkup(pageTitle)

get the wiki markup code (text) for the given page Title

Parameters:

Name Type Description Default
pageTitle(str)

the title of the page to retrieve

required

Returns:

Name Type Description
str

the wiki markup code for the page

Source code in wikibot3rd/wikibot.py
159
160
161
162
163
164
165
166
167
168
169
170
171
def getWikiMarkup(self, pageTitle):
    """
    get the wiki markup code (text) for the given page Title

    Args:
        pageTitle(str): the title of the page to retrieve

    Returns:
        str: the wiki markup code for the page
    """
    page = self.getPage(pageTitle)
    markup = page.text
    return markup

register_family_file(familyName, famfile)

register the family file

Parameters:

Name Type Description Default
family(str)

the familyName to register

required
famfile(str)

the path to the family file

required
Source code in wikibot3rd/wikibot.py
85
86
87
88
89
90
91
92
93
94
95
def register_family_file(self, familyName: str, famfile: str):
    """
    register the family file

    Args:
        family(str): the familyName to register
        famfile(str): the path to the family file
    """
    # deprecated code
    # config2.register_family_file(familyName, famfile)
    pywikibot.config.family_files[familyName] = famfile

savePage(pageTitle, pageContent, pageSummary)

save a page with the given pageTitle, pageContent and pageSummary

Source code in wikibot3rd/wikibot.py
197
198
199
200
201
def savePage(self, pageTitle, pageContent, pageSummary):
    """save a page with the given pageTitle, pageContent and pageSummary"""
    newPage = self.getPage(pageTitle)
    newPage.text = pageContent
    newPage.save(pageSummary)

wikiclient

WikiClient

Bases: Wiki

Access MediaWiki via mwclient library.

Source code in wikibot3rd/wikiclient.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
class WikiClient(Wiki):
    """
    Access MediaWiki via mwclient library.
    """

    def __init__(self, wiki_user: WikiUser, debug: bool = False):
        """
        Initialize the WikiClient with a WikiUser and an optional debug mode.

        Args:
            wiki_user: A WikiUser instance containing login credentials.
            debug: A flag to enable debug mode.
        """
        super(WikiClient, self).__init__(wiki_user, debug=debug)
        self.wiki_user: WikiUser = wiki_user
        # compatibility
        self.wikiUser = self.wiki_user
        self.site: Optional[Site] = None

    def get_site(self) -> Site:
        """
        Get the Site object for the MediaWiki site.

        Returns:
            The Site object representing the MediaWiki site.
        """
        if self.site is None:
            o = urlparse(self.wiki_user.url)
            scheme = o.scheme
            host = o.netloc
            path = o.path + self.wiki_user.scriptPath
            path = f"{path}/"
            self.site = Site(host=host, path=path, scheme=scheme)
        return self.site

    def getSite(self) -> Site:
        """Deprecated: Use get_site instead."""
        return self.get_site()

    def needs_login(self) -> bool:
        """
        Check if login is required for the wiki.

        Returns:
            True if login is required, False otherwise.
        """
        site = self.get_site()
        login_needed: bool = not site.writeapi
        return login_needed

    def needsLogin(self) -> bool:
        """Deprecated: Use needs_login instead."""
        return self.needs_login()

    def try_login(self) -> Exception:
        """
        Attempt to log in to the MediaWiki site.

        Returns:
            Exception: None if login is successful, Exception otherwise.
        """
        wu = self.wiki_user
        try:
            self.get_site().login(username=wu.user, password=wu.get_password())
            return None
        except Exception as ex:
            return ex

    def login(self) -> bool:
        """
        Attempt to log in to the MediaWiki site.

        Returns:
            True if login is successful, False otherwise.
        """
        ex = self.try_login()
        if ex and self.debug:
            print(f"Login failed: {ex}")
        success = ex is None
        return success

    def get_wiki_markup(self, page_title: str) -> str:
        """
        Get the wiki markup for a given page title.

        Args:
            page_title: The title of the page to retrieve the markup for.

        Returns:
            The wiki markup of the specified page.
        """
        page = self.get_page(page_title)
        markup = page.text()
        return markup

    def getWikiMarkup(self, pageTitle: str) -> str:
        """Deprecated: Use get_wiki_markup instead."""
        return self.get_wiki_markup(pageTitle)

    def get_html(self, page_title: str) -> str:
        """
        Get the HTML content for a given page title.

        Args:
            page_title: The title of the page to retrieve the HTML for.

        Returns:
            The HTML content of the specified page.
        """
        api = self.get_site().api("parse", page=page_title)
        if "parse" not in api:
            raise Exception(f"Could not retrieve HTML for page {page_title}")
        html: str = api["parse"]["text"]["*"]
        return html

    def getHtml(self, pageTitle: str) -> str:
        """Deprecated: Use get_html instead."""
        return self.get_html(pageTitle)

    def get_page(self, page_title: str) -> Any:
        """
        Get the page object for a given title.

        Args:
            page_title: The title of the page to retrieve.

        Returns:
            The page object for the specified title.
        """
        page = self.get_site().pages[page_title]
        return page

    def getPage(self, pageTitle: str) -> Any:
        """Deprecated: Use get_page instead."""
        return self.get_page(pageTitle)

    def save_page(self, page_title: str, page_content: str, page_summary: str) -> None:
        """
        Save a page with given title and content.

        Args:
            page_title: The title of the page.
            page_content: The new content of the page.
            page_summary: A summary of the changes made.
        """
        new_page = self.get_page(page_title)
        new_page.edit(page_content, page_summary)

    def savePage(self, pageTitle: str, pageContent: str, pageSummary: str) -> None:
        """Deprecated: Use save_page instead."""
        self.save_page(pageTitle, pageContent, pageSummary)

    def get_site_statistics(self) -> Dict[str, Any]:
        """
        Fetch site statistics using the MediaWiki API.

        Returns:
            A dictionary containing the site statistics.
        """
        params = {
            "action": "query",
            "meta": "siteinfo",
            "siprop": "statistics",
            "format": "json",
        }
        site = self.get_site()
        data = site.api(**params)
        statistics = data["query"]["statistics"]
        return statistics

    def getSiteStatistics(self) -> Dict[str, Any]:
        """Deprecated: Use get_site_statistics instead."""
        return self.get_site_statistics()

    @staticmethod
    def get_clients() -> Dict[str, "WikiClient"]:
        """
        Get a dictionary of WikiClient instances for all WikiUsers.

        Returns:
            Dict[str, WikiClient]: A dictionary with wiki user IDs as keys and WikiClient instances as values.
        """
        clients: Dict[str, WikiClient] = {}
        for wiki_user in WikiUser.getWikiUsers().values():
            wiki_client = WikiClient(wiki_user)
            clients[wiki_user.wikiId] = wiki_client
        return clients

    @staticmethod
    def getClients() -> Dict[str, "WikiClient"]:
        """Deprecated: Use get_clients instead."""
        return WikiClient.get_clients()

    @staticmethod
    def of_wiki_id(
        wiki_id: str, lenient: bool = True, debug: bool = False
    ) -> "WikiClient":
        """
        Create a WikiClient instance for a specific wiki ID.

        Args:
            wiki_id: The ID of the wiki to create a client for.
            lenient: Whether to be lenient in case of errors.
            debug: Whether to enable debug output.

        Returns:
            WikiClient: A WikiClient instance for the given wiki ID.
        """
        wiki_user = WikiUser.ofWikiId(wiki_id, lenient=lenient)
        wikibot = WikiClient(wiki_user, debug=debug)
        return wikibot

    @staticmethod
    def ofWikiId(
        wiki_id: str, lenient: bool = True, debug: bool = False
    ) -> "WikiClient":
        """Deprecated: Use of_wiki_id instead."""
        return WikiClient.of_wiki_id(wiki_id, lenient, debug)

    @staticmethod
    def of_wiki_user(wiki_user: WikiUser) -> "WikiClient":
        """
        Create a WikiClient instance from a WikiUser object.

        Args:
            wiki_user: A WikiUser instance to create a WikiClient for.

        Returns:
            WikiClient: A WikiClient instance for the given WikiUser.
        """
        wikibot = WikiClient(wiki_user)
        return wikibot

    @staticmethod
    def ofWikiUser(wiki_user: WikiUser) -> "WikiClient":
        """Deprecated: Use of_wiki_user instead."""
        return WikiClient.of_wiki_user(wiki_user)

__init__(wiki_user, debug=False)

Initialize the WikiClient with a WikiUser and an optional debug mode.

Parameters:

Name Type Description Default
wiki_user WikiUser

A WikiUser instance containing login credentials.

required
debug bool

A flag to enable debug mode.

False
Source code in wikibot3rd/wikiclient.py
15
16
17
18
19
20
21
22
23
24
25
26
27
def __init__(self, wiki_user: WikiUser, debug: bool = False):
    """
    Initialize the WikiClient with a WikiUser and an optional debug mode.

    Args:
        wiki_user: A WikiUser instance containing login credentials.
        debug: A flag to enable debug mode.
    """
    super(WikiClient, self).__init__(wiki_user, debug=debug)
    self.wiki_user: WikiUser = wiki_user
    # compatibility
    self.wikiUser = self.wiki_user
    self.site: Optional[Site] = None

getClients() staticmethod

Deprecated: Use get_clients instead.

Source code in wikibot3rd/wikiclient.py
198
199
200
201
@staticmethod
def getClients() -> Dict[str, "WikiClient"]:
    """Deprecated: Use get_clients instead."""
    return WikiClient.get_clients()

getHtml(pageTitle)

Deprecated: Use get_html instead.

Source code in wikibot3rd/wikiclient.py
125
126
127
def getHtml(self, pageTitle: str) -> str:
    """Deprecated: Use get_html instead."""
    return self.get_html(pageTitle)

getPage(pageTitle)

Deprecated: Use get_page instead.

Source code in wikibot3rd/wikiclient.py
142
143
144
def getPage(self, pageTitle: str) -> Any:
    """Deprecated: Use get_page instead."""
    return self.get_page(pageTitle)

getSite()

Deprecated: Use get_site instead.

Source code in wikibot3rd/wikiclient.py
45
46
47
def getSite(self) -> Site:
    """Deprecated: Use get_site instead."""
    return self.get_site()

getSiteStatistics()

Deprecated: Use get_site_statistics instead.

Source code in wikibot3rd/wikiclient.py
180
181
182
def getSiteStatistics(self) -> Dict[str, Any]:
    """Deprecated: Use get_site_statistics instead."""
    return self.get_site_statistics()

getWikiMarkup(pageTitle)

Deprecated: Use get_wiki_markup instead.

Source code in wikibot3rd/wikiclient.py
105
106
107
def getWikiMarkup(self, pageTitle: str) -> str:
    """Deprecated: Use get_wiki_markup instead."""
    return self.get_wiki_markup(pageTitle)

get_clients() staticmethod

Get a dictionary of WikiClient instances for all WikiUsers.

Returns:

Type Description
Dict[str, WikiClient]

Dict[str, WikiClient]: A dictionary with wiki user IDs as keys and WikiClient instances as values.

Source code in wikibot3rd/wikiclient.py
184
185
186
187
188
189
190
191
192
193
194
195
196
@staticmethod
def get_clients() -> Dict[str, "WikiClient"]:
    """
    Get a dictionary of WikiClient instances for all WikiUsers.

    Returns:
        Dict[str, WikiClient]: A dictionary with wiki user IDs as keys and WikiClient instances as values.
    """
    clients: Dict[str, WikiClient] = {}
    for wiki_user in WikiUser.getWikiUsers().values():
        wiki_client = WikiClient(wiki_user)
        clients[wiki_user.wikiId] = wiki_client
    return clients

get_html(page_title)

Get the HTML content for a given page title.

Parameters:

Name Type Description Default
page_title str

The title of the page to retrieve the HTML for.

required

Returns:

Type Description
str

The HTML content of the specified page.

Source code in wikibot3rd/wikiclient.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def get_html(self, page_title: str) -> str:
    """
    Get the HTML content for a given page title.

    Args:
        page_title: The title of the page to retrieve the HTML for.

    Returns:
        The HTML content of the specified page.
    """
    api = self.get_site().api("parse", page=page_title)
    if "parse" not in api:
        raise Exception(f"Could not retrieve HTML for page {page_title}")
    html: str = api["parse"]["text"]["*"]
    return html

get_page(page_title)

Get the page object for a given title.

Parameters:

Name Type Description Default
page_title str

The title of the page to retrieve.

required

Returns:

Type Description
Any

The page object for the specified title.

Source code in wikibot3rd/wikiclient.py
129
130
131
132
133
134
135
136
137
138
139
140
def get_page(self, page_title: str) -> Any:
    """
    Get the page object for a given title.

    Args:
        page_title: The title of the page to retrieve.

    Returns:
        The page object for the specified title.
    """
    page = self.get_site().pages[page_title]
    return page

get_site()

Get the Site object for the MediaWiki site.

Returns:

Type Description
Site

The Site object representing the MediaWiki site.

Source code in wikibot3rd/wikiclient.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def get_site(self) -> Site:
    """
    Get the Site object for the MediaWiki site.

    Returns:
        The Site object representing the MediaWiki site.
    """
    if self.site is None:
        o = urlparse(self.wiki_user.url)
        scheme = o.scheme
        host = o.netloc
        path = o.path + self.wiki_user.scriptPath
        path = f"{path}/"
        self.site = Site(host=host, path=path, scheme=scheme)
    return self.site

get_site_statistics()

Fetch site statistics using the MediaWiki API.

Returns:

Type Description
Dict[str, Any]

A dictionary containing the site statistics.

Source code in wikibot3rd/wikiclient.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
def get_site_statistics(self) -> Dict[str, Any]:
    """
    Fetch site statistics using the MediaWiki API.

    Returns:
        A dictionary containing the site statistics.
    """
    params = {
        "action": "query",
        "meta": "siteinfo",
        "siprop": "statistics",
        "format": "json",
    }
    site = self.get_site()
    data = site.api(**params)
    statistics = data["query"]["statistics"]
    return statistics

get_wiki_markup(page_title)

Get the wiki markup for a given page title.

Parameters:

Name Type Description Default
page_title str

The title of the page to retrieve the markup for.

required

Returns:

Type Description
str

The wiki markup of the specified page.

Source code in wikibot3rd/wikiclient.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def get_wiki_markup(self, page_title: str) -> str:
    """
    Get the wiki markup for a given page title.

    Args:
        page_title: The title of the page to retrieve the markup for.

    Returns:
        The wiki markup of the specified page.
    """
    page = self.get_page(page_title)
    markup = page.text()
    return markup

login()

Attempt to log in to the MediaWiki site.

Returns:

Type Description
bool

True if login is successful, False otherwise.

Source code in wikibot3rd/wikiclient.py
78
79
80
81
82
83
84
85
86
87
88
89
def login(self) -> bool:
    """
    Attempt to log in to the MediaWiki site.

    Returns:
        True if login is successful, False otherwise.
    """
    ex = self.try_login()
    if ex and self.debug:
        print(f"Login failed: {ex}")
    success = ex is None
    return success

needsLogin()

Deprecated: Use needs_login instead.

Source code in wikibot3rd/wikiclient.py
60
61
62
def needsLogin(self) -> bool:
    """Deprecated: Use needs_login instead."""
    return self.needs_login()

needs_login()

Check if login is required for the wiki.

Returns:

Type Description
bool

True if login is required, False otherwise.

Source code in wikibot3rd/wikiclient.py
49
50
51
52
53
54
55
56
57
58
def needs_login(self) -> bool:
    """
    Check if login is required for the wiki.

    Returns:
        True if login is required, False otherwise.
    """
    site = self.get_site()
    login_needed: bool = not site.writeapi
    return login_needed

ofWikiId(wiki_id, lenient=True, debug=False) staticmethod

Deprecated: Use of_wiki_id instead.

Source code in wikibot3rd/wikiclient.py
222
223
224
225
226
227
@staticmethod
def ofWikiId(
    wiki_id: str, lenient: bool = True, debug: bool = False
) -> "WikiClient":
    """Deprecated: Use of_wiki_id instead."""
    return WikiClient.of_wiki_id(wiki_id, lenient, debug)

ofWikiUser(wiki_user) staticmethod

Deprecated: Use of_wiki_user instead.

Source code in wikibot3rd/wikiclient.py
243
244
245
246
@staticmethod
def ofWikiUser(wiki_user: WikiUser) -> "WikiClient":
    """Deprecated: Use of_wiki_user instead."""
    return WikiClient.of_wiki_user(wiki_user)

of_wiki_id(wiki_id, lenient=True, debug=False) staticmethod

Create a WikiClient instance for a specific wiki ID.

Parameters:

Name Type Description Default
wiki_id str

The ID of the wiki to create a client for.

required
lenient bool

Whether to be lenient in case of errors.

True
debug bool

Whether to enable debug output.

False

Returns:

Name Type Description
WikiClient WikiClient

A WikiClient instance for the given wiki ID.

Source code in wikibot3rd/wikiclient.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
@staticmethod
def of_wiki_id(
    wiki_id: str, lenient: bool = True, debug: bool = False
) -> "WikiClient":
    """
    Create a WikiClient instance for a specific wiki ID.

    Args:
        wiki_id: The ID of the wiki to create a client for.
        lenient: Whether to be lenient in case of errors.
        debug: Whether to enable debug output.

    Returns:
        WikiClient: A WikiClient instance for the given wiki ID.
    """
    wiki_user = WikiUser.ofWikiId(wiki_id, lenient=lenient)
    wikibot = WikiClient(wiki_user, debug=debug)
    return wikibot

of_wiki_user(wiki_user) staticmethod

Create a WikiClient instance from a WikiUser object.

Parameters:

Name Type Description Default
wiki_user WikiUser

A WikiUser instance to create a WikiClient for.

required

Returns:

Name Type Description
WikiClient WikiClient

A WikiClient instance for the given WikiUser.

Source code in wikibot3rd/wikiclient.py
229
230
231
232
233
234
235
236
237
238
239
240
241
@staticmethod
def of_wiki_user(wiki_user: WikiUser) -> "WikiClient":
    """
    Create a WikiClient instance from a WikiUser object.

    Args:
        wiki_user: A WikiUser instance to create a WikiClient for.

    Returns:
        WikiClient: A WikiClient instance for the given WikiUser.
    """
    wikibot = WikiClient(wiki_user)
    return wikibot

savePage(pageTitle, pageContent, pageSummary)

Deprecated: Use save_page instead.

Source code in wikibot3rd/wikiclient.py
158
159
160
def savePage(self, pageTitle: str, pageContent: str, pageSummary: str) -> None:
    """Deprecated: Use save_page instead."""
    self.save_page(pageTitle, pageContent, pageSummary)

save_page(page_title, page_content, page_summary)

Save a page with given title and content.

Parameters:

Name Type Description Default
page_title str

The title of the page.

required
page_content str

The new content of the page.

required
page_summary str

A summary of the changes made.

required
Source code in wikibot3rd/wikiclient.py
146
147
148
149
150
151
152
153
154
155
156
def save_page(self, page_title: str, page_content: str, page_summary: str) -> None:
    """
    Save a page with given title and content.

    Args:
        page_title: The title of the page.
        page_content: The new content of the page.
        page_summary: A summary of the changes made.
    """
    new_page = self.get_page(page_title)
    new_page.edit(page_content, page_summary)

try_login()

Attempt to log in to the MediaWiki site.

Returns:

Name Type Description
Exception Exception

None if login is successful, Exception otherwise.

Source code in wikibot3rd/wikiclient.py
64
65
66
67
68
69
70
71
72
73
74
75
76
def try_login(self) -> Exception:
    """
    Attempt to log in to the MediaWiki site.

    Returns:
        Exception: None if login is successful, Exception otherwise.
    """
    wu = self.wiki_user
    try:
        self.get_site().login(username=wu.user, password=wu.get_password())
        return None
    except Exception as ex:
        return ex

wikiedit

Created on 2020-11-12

@author: wf

wikinuke

Created on 2020-11-12

@author: wf

wikipush

Created on 2020-10-29 @author: wf @copyright: Wolfgang Fahl. All rights reserved.

WikiPush

Bases: object

Push pages from one MediaWiki to another

Source code in wikibot3rd/wikipush.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
class WikiPush(object):
    """
    Push pages from one MediaWiki to another
    """

    differ = None

    def __init__(
        self,
        fromWikiId: str,
        toWikiId: str = None,
        login: bool = False,
        verbose: bool = True,
        debug: bool = False,
    ):
        """
        Constructor

        Args:
            fromWikiId(str): the id of the wiki to push from (source)
            toWikiID(str): the id of the wiki to push to (target)
            login(bool): if True login to source wiki
            verbose(bool): if True print info messages
            debug(bool): if True show debugging messages
        """
        self.verbose = verbose
        self.debug = debug
        self.fromWiki = None
        self.toWiki = None

        self.fromWikiId = fromWikiId
        if self.fromWikiId is not None:
            self.fromWiki = WikiClient.ofWikiId(fromWikiId, debug=self.debug)
        self.toWikiId = toWikiId
        if self.toWikiId is not None:
            self.toWiki = WikiClient.ofWikiId(toWikiId, debug=self.debug)
        if login and self.fromWikiId is not None:
            if not self.fromWiki.login():
                msg = f"can't login to source Wiki {fromWikiId}"
                raise Exception(msg)
        if self.toWiki is not None:
            if not self.toWiki.login():
                msg = f"can't login to target Wiki {toWikiId}"
                raise Exception(msg)

    def log(self, msg: str, end="\n"):
        """
        show the given message if verbose is on

        Args:
            msg(str): the message to display
        """
        if self.verbose:
            print(msg, end=end)

    def formatQueryResult(
        self,
        askQuery,
        wiki=None,
        limit=None,
        showProgress=False,
        queryDivision=1,
        outputFormat="lod",
        entityName="data",
        title: str = None,
    ):
        """
        format the query result for the given askQuery.
        Args:
             askQuery(string): Semantic Media Wiki in line query https://www.semantic-mediawiki.org/wiki/Help:Inline_queries
            wiki(wikibot3rd): the wiki to query - use fromWiki if not specified
            limit(int): the limit for the query (optional)
            showProgress(bool): true if progress of the query retrieval should be indicated (default: one dot per 50 records ...)
            queryDivision(int): Defines the number of subintervals the query is divided into (must be greater equal 1)
            outputFormat(str): output format of the query results - default format is lod
            entityName(str): the name of the entity
            title(str): the title of the query (if any)
        Returns:
            Query results in the requested outputFormat as string.
            If the requested outputFormat is not supported None is returned.
        """
        pageRecords = self.queryPages(
            askQuery, wiki, limit, showProgress, queryDivision
        )
        outputFormat = outputFormat.lower()
        if outputFormat == "csv":
            return self.convertToCSV(pageRecords)
        elif outputFormat == "json":
            res = []
            for page in pageRecords.values():
                res.append(page)
            res_json = json.dumps({entityName: res}, default=str, indent=3)
            return res_json
        elif outputFormat == "lod":
            return [pageRecord for pageRecord in pageRecords.values()]
        else:
            if title is None:
                title = entityName
            query = Query(name=entityName, query=askQuery, title=title)
            qlod = [pageRecord for pageRecord in pageRecords.values()]
            doc = query.documentQueryResult(
                qlod, limit, tablefmt=outputFormat, withSourceCode=False
            )
            return doc
            # if self.debug:
            #    print(f"Format {outputFormat} is not supported.")

    def convertToCSV(self, pageRecords, separator=";"):
        """
        Converts the given pageRecords into a str in csv format
        ToDO: Currently does not support escaping of the separator and escaping of quotes
        Args:
            pageRecords: dict of dicts containing the printouts
            separator(char):
        Returns: str
        """
        res = ""
        printedHeaders = False
        for pageRecord in pageRecords.values():
            if not printedHeaders:
                for key in pageRecord.keys():
                    res = f"{res}{key}{separator}"
                res = f"{res[:-1]}\n"
                printedHeaders = True
            for printouts in pageRecord.values():
                res = f"{res}{printouts}{separator}"
            res = f"{res[:-1]}\n"  # remove last separator and end line
        return res

    def queryPages(
        self, askQuery, wiki=None, limit=None, showProgress=False, queryDivision=1
    ) -> dict:
        """
        query the given wiki for pagerecords matching the given askQuery

        Args:
            askQuery(string): Semantic Media Wiki in line query https://www.semantic-mediawiki.org/wiki/Help:Inline_queries
            wiki(wikibot3rd): the wiki to query - use fromWiki if not specified
            limit(int): the limit for the query (optional)
            showProgress(bool): true if progress of the query retrieval should be indicated (default: one dot per 50 records ...)
            queryDivision(int): Defines the number of subintervals the query is divided into (must be greater equal 1)
        Returns:
            list: a list of pageRecords matching the given askQuery
        """
        if wiki is None:
            wiki = self.fromWiki
        smwClient = SMWClient(
            wiki.getSite(),
            showProgress=showProgress,
            queryDivision=queryDivision,
            debug=self.debug,
        )
        pageRecords = smwClient.query(askQuery, limit=limit)
        return pageRecords

    def query(
        self,
        askQuery,
        wiki=None,
        queryField=None,
        limit=None,
        showProgress=False,
        queryDivision=1,
    ):
        """
        query the given wiki for pages matching the given askQuery

        Args:
            askQuery(string): Semantic Media Wiki in line query https://www.semantic-mediawiki.org/wiki/Help:Inline_queries
            wiki(wikibot3rd): the wiki to query - use fromWiki if not specified
            queryField(string): the field to select the pageTitle from
            limit(int): the limit for the query (optional)
            showProgress(bool): true if progress of the query retrieval should be indicated (default: one dot per 50 records ...)
        Returns:
            list: a list of pageTitles matching the given askQuery
        """
        pageRecords = self.queryPages(
            askQuery, wiki, limit, showProgress, queryDivision
        )
        if queryField is None:
            return pageRecords.keys()
        # use a Dict to remove duplicates
        pagesDict = {}
        for pageRecord in pageRecords.values():
            if queryField in pageRecord:
                pagesDict[pageRecord[queryField]] = True
        return list(pagesDict.keys())

    def nuke(self, pageTitles, force=False):
        """
        delete the pages with the given page Titles

        Args:
            pageTitles(list): a list of page titles to be transfered from the formWiki to the toWiki
            force(bool): True if pages should be actually deleted - dry run only listing pages is default
        """
        total = len(pageTitles)
        self.log(
            "deleting %d pages in %s (%s)"
            % (total, self.toWikiId, "forced" if force else "dry run")
        )
        for i, pageTitle in enumerate(pageTitles):
            try:
                self.log(
                    "%d/%d (%4.0f%%): deleting %s ..."
                    % (i + 1, total, (i + 1) / total * 100, pageTitle),
                    end="",
                )
                pageToBeDeleted = self.toWiki.getPage(pageTitle)
                if not force:
                    self.log("👍" if pageToBeDeleted.exists else "👎")
                else:
                    pageToBeDeleted.delete("deleted by wiknuke")
                    self.log("✅")
            except Exception as ex:
                self.show_exception(ex)

    @staticmethod
    def getDiff(text: str, newText: str, n: int = 1, forHuman: bool = True) -> str:
        """
        Compare the two given strings and return the differences
        Args:
            text: old text to compare the new text to
            newText: new text
            n: The number of context lines
            forHuman: If True update the diff string to be better human-readable

        Returns:
            str: difference string
        """
        # if WikiPush.differ is None:
        #    WikiPush.differ=Differ()
        # https://docs.python.org/3/library/difflib.html
        #  difflib.unified_diff(a, b, fromfile='', tofile='', fromfiledate='', tofiledate='', n=3, lineterm='\n')¶
        # diffs=WikiPush.differ.compare(,)
        textLines = text.split("\n")
        newTextLines = newText.split("\n")
        diffs = difflib.unified_diff(textLines, newTextLines, n=n)
        if forHuman:
            hdiffs = []
            for line in diffs:
                unwantedItems = ["@@", "---", "+++"]
                keep = True
                for unwanted in unwantedItems:
                    if unwanted in line:
                        keep = False
                if keep:
                    hdiffs.append(line)
        else:
            hdiffs = diffs
        diffStr = "\n".join(hdiffs)
        return diffStr

    @staticmethod
    def getModify(
        search: str, replace: str, debug: bool = False
    ) -> typing.Callable[[str], str]:
        """
        get the modification function

        Args:
            search(str): the search string
            replace(str): the replace string
            debug(bool): if debug show

        Returns:
            String modify function that takes as input the string, applies the search and replace action
             and returns the modified string
        """
        if debug:
            print(f"search regex: {search}")
            print(f"replace regex: {replace}")
        searchRegex = r"%s" % search
        replaceRegex = r"%s" % replace
        modify = lambda text: re.sub(searchRegex, replaceRegex, text)
        return modify

    def edit(
        self,
        pageTitles: typing.List[str],
        modify: typing.Callable[[str], str] = None,
        context: int = 1,
        force: bool = False,
    ):
        """
        edit the pages with the given page Titles

        Args:
            pageTitles(list): a list of page titles to be transferred from the formWiki to the toWiki
            modify: String modify function that takes as input the string and returns the modified string
            context: The number of context lines
            force(bool): True if pages should be actually deleted - dry run only listing pages is default
        """
        if modify is None:
            raise Exception("wikipush edit needs a modify function!")
        total = len(pageTitles)
        self.log(
            "editing %d pages in %s (%s)"
            % (total, self.toWikiId, "forced" if force else "dry run")
        )
        for i, pageTitle in enumerate(pageTitles):
            try:
                self.log(
                    "%d/%d (%4.0f%%): editing %s ..."
                    % (i + 1, total, (i + 1) / total * 100, pageTitle),
                    end="",
                )
                pageToBeEdited = self.toWiki.getPage(pageTitle)
                if not force and not pageToBeEdited.exists:
                    self.log("👎")
                else:
                    comment = "edited by wikiedit"
                    text = pageToBeEdited.text()
                    newText = modify(text)
                    if newText != text:
                        if force:
                            pageToBeEdited.edit(newText, comment)
                            self.log("✅")
                        else:
                            diffStr = self.getDiff(text, newText, n=context)
                            self.log(f"👍{diffStr}")
                    else:
                        self.log("↔")
            except Exception as ex:
                self.show_exception(ex)

    def edit_wikison(
        self,
        page_titles: typing.List[str],
        entity_type_name: str,
        property_name: str,
        value: typing.Any,
        force: bool = False,
    ):
        """
        Edit the WikiSON for on the given pages
        Args:
            page_titles: a list of page titles to be edited
            entity_type_name: name of the WikiSON entity type
            property_name: name of the property to edit
            value: value to set. If None property is deleted from the WikiSON
            force: If False only print the changes. Otherwise, apply the changes
        """
        total = len(page_titles)
        self.log(
            f"""editing {total} pages in {self.toWikiId} ({"forced" if force else "dry run"})"""
        )
        for i, page_title in enumerate(page_titles, 1):
            try:
                self.log(
                    f"{i}/{total} ({i/total*100:.2f}%): editing {page_title} ...",
                    end="",
                )
                page_to_be_edited = self.toWiki.getPage(page_title)
                if not force and not page_to_be_edited.exists:
                    self.log("👎")
                else:
                    comment = "edited by wikiedit"
                    markup = page_to_be_edited.text()
                    wikison = WikiSON(page_title, markup)
                    new_markup = wikison.set(
                        entity_type_name=entity_type_name, record={property_name: value}
                    )
                    if new_markup != markup:
                        if force:
                            page_to_be_edited.edit(new_markup, comment)
                            self.log("✅")
                        else:
                            diff_str = self.getDiff(markup, new_markup, n=3)
                            self.log(f"👍{diff_str}")
                    else:
                        self.log("↔")
            except Exception as ex:
                self.show_exception(ex)

    def upload(self, files, force=False):
        """
        push the given files
        Args:
            files(list): a list of filenames to be transfered to the toWiki
            force(bool): True if images should be overwritten if they exist
        """
        total = len(files)
        self.log("uploading %d files to %s" % (total, self.toWikiId))
        for i, file in enumerate(files):
            try:
                self.log(
                    "%d/%d (%4.0f%%): uploading %s ..."
                    % (i + 1, total, (i + 1) / total * 100, file),
                    end="",
                )
                description = "uploaded by wikiupload"
                filename = os.path.basename(file)
                self.uploadImage(file, filename, description, force)
                self.log("✅")
            except Exception as ex:
                self.show_exception(ex)

    def backup(self, pageTitles, backupPath=None, git=False, withImages=False):
        """
        backup the given page titles
        Args:
            pageTitles(list): a list of page titles to be downloaded from the fromWiki
            git(bool): True if git should be used as a version control system
            withImages(bool): True if the image on a page should also be copied
        """
        if backupPath is None:
            backupPath = self.getHomePath("wikibackup/%s" % self.fromWikiId)
        imageBackupPath = "%s/images" % backupPath
        total = len(pageTitles)
        self.log(
            "downloading %d pages from %s to %s" % (total, self.fromWikiId, backupPath)
        )
        for i, pageTitle in enumerate(pageTitles):
            try:
                self.log(
                    "%d/%d (%4.0f%%): downloading %s ..."
                    % (i + 1, total, (i + 1) / total * 100, pageTitle),
                    end="",
                )
                page = self.fromWiki.getPage(pageTitle)
                wikiFilePath = "%s/%s.wiki" % (backupPath, pageTitle)
                self.ensureParentDirectoryExists(wikiFilePath)
                with open(wikiFilePath, "w") as wikiFile:
                    wikiFile.write(page.text())
                self.log("✅")
                if isinstance(page, Image):
                    self.backupImages([page], imageBackupPath)
                if withImages:
                    self.backupImages(page.images(), imageBackupPath)

            except Exception as ex:
                self.show_exception(ex)
        if git:
            gitPath = "%s/.git" % backupPath
            if not os.path.isdir(gitPath):
                self.log("initializing git repository ...")
                repo = Repo.init(backupPath)
            else:
                repo = Repo(backupPath)
            self.log("committing to git repository")
            repo.git.add(all=True)
            timestamp = datetime.datetime.now().isoformat()
            repo.index.commit("auto commit by wikibackup at %s" % timestamp)

    def backupImages(self, imageList: list, imageBackupPath: str):
        """
        backup the images in the givne imageList

        Args:
            imageList(list): the list of images
            imageBackupPath(str): the path to the backup directory
        """
        for image in imageList:
            try:
                imagePath, filename = self.downloadImage(image, imageBackupPath)
            except Exception as ex:
                self.handleException(ex)

    def work(
        self,
        pageTitles: list,
        activity: str = "copying",
        comment: str = "pushed",
        force: bool = False,
        ignore: bool = False,
        withImages: bool = False,
    ) -> list:
        """
        work on the given page titles

        Args:
            pageTitles(list): a list of page titles to be transfered from the formWiki to the toWiki
            activity(str): the activity to perform
            comment(str): the comment to display
            force(bool): True if pages should be overwritten if they exist
            ignore(bool): True if warning for images should be ignored (e.g if they exist)
            withImages(bool): True if the image on a page should also be copied
        Returns:
            list: a list of pageTitles for which the activity failed
        """
        failed = []
        total = len(pageTitles)
        self.log(f"{activity} {total} pages from {self.fromWikiId} to {self.toWikiId}")
        for i, pageTitle in enumerate(pageTitles):
            try:
                percent = (i + 1) / total * 100
                self.log(
                    f"{i+1}/{total} ({percent:4.0f}%): {activity} ... {pageTitle}",
                    end="",
                )
                page = self.fromWiki.getPage(pageTitle)
                if page.exists:
                    # is this an image?
                    if isinstance(page, Image):
                        self.pushImages([page], ignore=ignore)
                    else:
                        newPage = self.toWiki.getPage(pageTitle)
                        if not newPage.exists or force:
                            try:
                                newPage.edit(page.text(), comment)
                                self.log("✅")
                                pageOk = True
                            except Exception as ex:
                                pageOk = self.handleException(ex, ignore)
                                if not pageOk:
                                    failed.append(pageTitle)
                            if withImages and pageOk:
                                self.pushImages(page.images(), ignore=ignore)
                        else:
                            self.log("👎")
                else:
                    self.log("❌")
                    failed.append(pageTitle)
            except Exception as ex:
                self.show_exception(ex)
                failed.append(pageTitle)
        return failed

    def push(self, pageTitles, force=False, ignore=False, withImages=False) -> list:
        """
        push the given page titles

        Args:
            pageTitles(list): a list of page titles to be transfered from the formWiki to the toWiki
            force(bool): True if pages should be overwritten if they exist
            ignore(bool): True if warning for images should be ignored (e.g if they exist)
            withImages(bool): True if the image on a page should also be copied
        Returns:
            list: a list of pageTitles for which the activity failed
        """
        comment = f"pushed from {self.fromWikiId} by wikipush"
        return self.work(
            pageTitles,
            activity="copying",
            comment=comment,
            force=force,
            ignore=ignore,
            withImages=withImages,
        )

    def ensureParentDirectoryExists(self, filePath: str):
        """
        for pages that have a "/" in the name make sure that the parent Directory exists

        Args:
            filePath(str): the filePath to check
        """
        directory = os.path.dirname(filePath)
        self.ensureDirectoryExists(directory)

    def ensureDirectoryExists(self, directory: str):
        """
        make sure the given directory exists

        Args:
            directory(str): the directory to check for existence
        """
        Path(directory).mkdir(parents=True, exist_ok=True)

    def getHomePath(self, localPath):
        """
        get the given home path
        """
        homePath = f"{Path.home()}/{localPath}"
        self.ensureDirectoryExists(homePath)
        return homePath

    def getDownloadPath(self):
        """
        get the download path
        """
        return self.getHomePath("Downloads/mediawiki")

    def pushImages(self, imageList, delim="", ignore=False):
        """
        push the images in the given image List

        Args:
            imageList(list): a list of images to be pushed
            ignore(bool): True to upload despite any warnings.
        """
        for image in imageList:
            try:
                self.log("%scopying image %s ..." % (delim, image.name), end="")
                imagePath, filename = self.downloadImage(image)
                description = image.imageinfo["comment"]
                try:
                    self.uploadImage(imagePath, filename, description, ignore)
                    self.log("✅")
                except Exception as ex:
                    self.handleAPIWarnings(ex.args[0], ignoreExists=ignore)
                    if self.debug:
                        self.show_exception(ex)
                if self.debug:
                    print(image.imageinfo)
            except Exception as ex:
                self.handleException(ex, ignore)

    def show_exception(self, ex: Exception):
        """
        Show the given exception and, if debug mode is on, show the traceback.
        """
        msg = f"❌: {str(ex)}"
        if self.debug:
            # Append the formatted traceback to the message
            msg += "\n" + traceback.format_exc()

        self.log(msg)

    def handleException(self, ex, ignoreExists=False):
        """
        handle the given exception and ignore it if it includes "exists" and ignoreExists is True

        Args:
            ex(Exception): the exception to handle
            ignoreExists(bool): True if "exists" should be ignored

        Returns:
            bool: True if the exception was handled as ok False if it was logged as an error
        """
        msg = str(ex)
        return self.handleWarning(msg, marker="❌", ignoreExists=ignoreExists)

    def handleAPIWarnings(self, warnings, ignoreExists=False):
        """
        handle API Warnings

        Args:
            warnings(list): a list of API warnings
            ignoreExists(bool): ignore messages that warn about existing content

        Returns:
            bool: True if the exception was handled as ok False if it was logged as an error
        """
        msg = ""
        if warnings:
            if isinstance(warnings, str):
                msg = warnings
            else:
                for warning in warnings:
                    msg += "%s\n" % str(warning)
        return self.handleWarning(msg, ignoreExists=ignoreExists)

    def handleWarning(self, msg, marker="⚠️", ignoreExists=False):
        """
        handle the given warning and ignore it if it includes "exists" and ignoreExists is True

        Args:
            msg(string): the warning to handle
            marker(string): the marker to use for the message
            ignoreExists(bool): True if "exists" should be ignored

        Returns:
            bool: True if the exception was handled as ok False if it was logged as an error
        """
        # print ("handling warning %s with ignoreExists=%r" % (msg,ignoreExists))
        if ignoreExists and "exists" in msg:
            # shorten exact duplicate message
            if "exact duplicate" in msg:
                msg = "exact duplicate"
            marker = "👀"
        if not ignoreExists and "exists" in msg:
            msg = (
                "file exists (to overwrite existing files enable the ignore parameter)"
            )
        self.log("%s:%s" % (marker, msg))
        return marker == "👀"

    def downloadImage(self, image, downloadPath=None):
        """
        download the given image

        Args:
            image(image): the image to download
            downloadPath(str): the path to download to if None getDownloadPath will be used
        """
        original_filename = image.name
        prefixes = ["File", "Datei", "Fichier", "Archivo", "Файл", "文件", "ファイル"]
        for prefix in prefixes:
            if original_filename.startswith(f"{prefix}:"):
                filename = original_filename.replace(f"{prefix}:", "")
                break
        else:
            filename = original_filename  # Fallback in case no prefix matches

        if downloadPath is None:
            downloadPath = self.getDownloadPath()
        imagePath = "%s/%s" % (downloadPath, filename)
        self.ensureParentDirectoryExists(imagePath)
        with open(imagePath, "wb") as imageFile:
            image.download(imageFile)
        return imagePath, filename

    def uploadImage(self, imagePath, filename, description, ignoreExists=False):
        """
        upload an image

        Args:
            imagePath(str): the path to the image
            filename(str): the filename to use
            description(str): the description to use
            ignoreExists(bool): True if it should be ignored if the image exists
        """
        with open(imagePath, "rb") as imageFile:
            warnings = None
            response = self.toWiki.site.upload(
                imageFile, filename, description, ignoreExists
            )
            if "warnings" in response:
                warnings = response["warnings"]
            if "upload" in response and "warnings" in response["upload"]:
                warningsDict = response["upload"]["warnings"]
                warnings = []
                for item in warningsDict.items():
                    warnings.append(str(item))
            if warnings:
                raise Exception(warnings)

    def restore(self, pageTitles=None, backupPath=None, listFile=None, stdIn=False):
        """
        restore given page titles from local backup
        If no page titles are given the whole backup is restored.

        Args:
            pageTitles(list): a list of pageTitles to be restored to toWiki. If None -> full restore of backup
            backupPath(str): path to backup location
            listFile:
            stdIn:
        """
        if stdIn:
            backupPath = os.path.dirname(pageTitles[0].strip())
            pageTitlesfix = []
            for i in pageTitles:
                pageTitlesfix.append(os.path.basename(i.strip().replace(".wiki", "")))
            pageTitles = pageTitlesfix
        elif listFile is not None:
            f = open(listFile, "r")
            allx = f.readlines()
            pageTitles = []
            for i in allx:
                pageTitles.append(os.path.basename(i.strip()).replace(".wiki", ""))
        else:
            if backupPath is None:
                backupPath = self.getHomePath(f"wikibackup/{self.toWikiId}")
            if pageTitles is None:
                pageTitles = []
                for path, subdirs, files in os.walk(backupPath):
                    for name in files:
                        filename = os.path.join(path, name)[len(backupPath) + 1 :]
                        if filename.endswith(".wiki"):
                            pageTitles.append(filename[: -len(".wiki")])
        total = len(pageTitles)
        self.log(
            "restoring %d pages from %s to %s" % (total, backupPath, self.toWikiId)
        )
        for i, pageTitle in enumerate(pageTitles):
            try:
                self.log(
                    "%d/%d (%4.0f%%): restore %s ..."
                    % (i + 1, total, (i + 1) / total * 100, pageTitle),
                    end="",
                )
                wikiFilePath = f"{backupPath}/{pageTitle}.wiki"
                with open(wikiFilePath, mode="r") as wikiFile:
                    page_content = wikiFile.read()
                    page = self.toWiki.getPage(pageTitle)
                    page.edit(
                        page_content,
                        f"modified through wikirestore by {self.toWiki.wikiUser.user}",
                    )
                self.log("✅")
            except Exception as ex:
                self.show_exception(ex)

__init__(fromWikiId, toWikiId=None, login=False, verbose=True, debug=False)

Constructor

Parameters:

Name Type Description Default
fromWikiId(str)

the id of the wiki to push from (source)

required
toWikiID(str)

the id of the wiki to push to (target)

required
login(bool)

if True login to source wiki

required
verbose(bool)

if True print info messages

required
debug(bool)

if True show debugging messages

required
Source code in wikibot3rd/wikipush.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def __init__(
    self,
    fromWikiId: str,
    toWikiId: str = None,
    login: bool = False,
    verbose: bool = True,
    debug: bool = False,
):
    """
    Constructor

    Args:
        fromWikiId(str): the id of the wiki to push from (source)
        toWikiID(str): the id of the wiki to push to (target)
        login(bool): if True login to source wiki
        verbose(bool): if True print info messages
        debug(bool): if True show debugging messages
    """
    self.verbose = verbose
    self.debug = debug
    self.fromWiki = None
    self.toWiki = None

    self.fromWikiId = fromWikiId
    if self.fromWikiId is not None:
        self.fromWiki = WikiClient.ofWikiId(fromWikiId, debug=self.debug)
    self.toWikiId = toWikiId
    if self.toWikiId is not None:
        self.toWiki = WikiClient.ofWikiId(toWikiId, debug=self.debug)
    if login and self.fromWikiId is not None:
        if not self.fromWiki.login():
            msg = f"can't login to source Wiki {fromWikiId}"
            raise Exception(msg)
    if self.toWiki is not None:
        if not self.toWiki.login():
            msg = f"can't login to target Wiki {toWikiId}"
            raise Exception(msg)

backup(pageTitles, backupPath=None, git=False, withImages=False)

backup the given page titles Args: pageTitles(list): a list of page titles to be downloaded from the fromWiki git(bool): True if git should be used as a version control system withImages(bool): True if the image on a page should also be copied

Source code in wikibot3rd/wikipush.py
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
def backup(self, pageTitles, backupPath=None, git=False, withImages=False):
    """
    backup the given page titles
    Args:
        pageTitles(list): a list of page titles to be downloaded from the fromWiki
        git(bool): True if git should be used as a version control system
        withImages(bool): True if the image on a page should also be copied
    """
    if backupPath is None:
        backupPath = self.getHomePath("wikibackup/%s" % self.fromWikiId)
    imageBackupPath = "%s/images" % backupPath
    total = len(pageTitles)
    self.log(
        "downloading %d pages from %s to %s" % (total, self.fromWikiId, backupPath)
    )
    for i, pageTitle in enumerate(pageTitles):
        try:
            self.log(
                "%d/%d (%4.0f%%): downloading %s ..."
                % (i + 1, total, (i + 1) / total * 100, pageTitle),
                end="",
            )
            page = self.fromWiki.getPage(pageTitle)
            wikiFilePath = "%s/%s.wiki" % (backupPath, pageTitle)
            self.ensureParentDirectoryExists(wikiFilePath)
            with open(wikiFilePath, "w") as wikiFile:
                wikiFile.write(page.text())
            self.log("✅")
            if isinstance(page, Image):
                self.backupImages([page], imageBackupPath)
            if withImages:
                self.backupImages(page.images(), imageBackupPath)

        except Exception as ex:
            self.show_exception(ex)
    if git:
        gitPath = "%s/.git" % backupPath
        if not os.path.isdir(gitPath):
            self.log("initializing git repository ...")
            repo = Repo.init(backupPath)
        else:
            repo = Repo(backupPath)
        self.log("committing to git repository")
        repo.git.add(all=True)
        timestamp = datetime.datetime.now().isoformat()
        repo.index.commit("auto commit by wikibackup at %s" % timestamp)

backupImages(imageList, imageBackupPath)

backup the images in the givne imageList

Parameters:

Name Type Description Default
imageList(list)

the list of images

required
imageBackupPath(str)

the path to the backup directory

required
Source code in wikibot3rd/wikipush.py
477
478
479
480
481
482
483
484
485
486
487
488
489
def backupImages(self, imageList: list, imageBackupPath: str):
    """
    backup the images in the givne imageList

    Args:
        imageList(list): the list of images
        imageBackupPath(str): the path to the backup directory
    """
    for image in imageList:
        try:
            imagePath, filename = self.downloadImage(image, imageBackupPath)
        except Exception as ex:
            self.handleException(ex)

convertToCSV(pageRecords, separator=';')

Converts the given pageRecords into a str in csv format ToDO: Currently does not support escaping of the separator and escaping of quotes Args: pageRecords: dict of dicts containing the printouts separator(char): Returns: str

Source code in wikibot3rd/wikipush.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
def convertToCSV(self, pageRecords, separator=";"):
    """
    Converts the given pageRecords into a str in csv format
    ToDO: Currently does not support escaping of the separator and escaping of quotes
    Args:
        pageRecords: dict of dicts containing the printouts
        separator(char):
    Returns: str
    """
    res = ""
    printedHeaders = False
    for pageRecord in pageRecords.values():
        if not printedHeaders:
            for key in pageRecord.keys():
                res = f"{res}{key}{separator}"
            res = f"{res[:-1]}\n"
            printedHeaders = True
        for printouts in pageRecord.values():
            res = f"{res}{printouts}{separator}"
        res = f"{res[:-1]}\n"  # remove last separator and end line
    return res

downloadImage(image, downloadPath=None)

download the given image

Parameters:

Name Type Description Default
image(image)

the image to download

required
downloadPath(str)

the path to download to if None getDownloadPath will be used

required
Source code in wikibot3rd/wikipush.py
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
def downloadImage(self, image, downloadPath=None):
    """
    download the given image

    Args:
        image(image): the image to download
        downloadPath(str): the path to download to if None getDownloadPath will be used
    """
    original_filename = image.name
    prefixes = ["File", "Datei", "Fichier", "Archivo", "Файл", "文件", "ファイル"]
    for prefix in prefixes:
        if original_filename.startswith(f"{prefix}:"):
            filename = original_filename.replace(f"{prefix}:", "")
            break
    else:
        filename = original_filename  # Fallback in case no prefix matches

    if downloadPath is None:
        downloadPath = self.getDownloadPath()
    imagePath = "%s/%s" % (downloadPath, filename)
    self.ensureParentDirectoryExists(imagePath)
    with open(imagePath, "wb") as imageFile:
        image.download(imageFile)
    return imagePath, filename

edit(pageTitles, modify=None, context=1, force=False)

edit the pages with the given page Titles

Parameters:

Name Type Description Default
pageTitles(list)

a list of page titles to be transferred from the formWiki to the toWiki

required
modify Callable[[str], str]

String modify function that takes as input the string and returns the modified string

None
context int

The number of context lines

1
force(bool)

True if pages should be actually deleted - dry run only listing pages is default

required
Source code in wikibot3rd/wikipush.py
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
def edit(
    self,
    pageTitles: typing.List[str],
    modify: typing.Callable[[str], str] = None,
    context: int = 1,
    force: bool = False,
):
    """
    edit the pages with the given page Titles

    Args:
        pageTitles(list): a list of page titles to be transferred from the formWiki to the toWiki
        modify: String modify function that takes as input the string and returns the modified string
        context: The number of context lines
        force(bool): True if pages should be actually deleted - dry run only listing pages is default
    """
    if modify is None:
        raise Exception("wikipush edit needs a modify function!")
    total = len(pageTitles)
    self.log(
        "editing %d pages in %s (%s)"
        % (total, self.toWikiId, "forced" if force else "dry run")
    )
    for i, pageTitle in enumerate(pageTitles):
        try:
            self.log(
                "%d/%d (%4.0f%%): editing %s ..."
                % (i + 1, total, (i + 1) / total * 100, pageTitle),
                end="",
            )
            pageToBeEdited = self.toWiki.getPage(pageTitle)
            if not force and not pageToBeEdited.exists:
                self.log("👎")
            else:
                comment = "edited by wikiedit"
                text = pageToBeEdited.text()
                newText = modify(text)
                if newText != text:
                    if force:
                        pageToBeEdited.edit(newText, comment)
                        self.log("✅")
                    else:
                        diffStr = self.getDiff(text, newText, n=context)
                        self.log(f"👍{diffStr}")
                else:
                    self.log("↔")
        except Exception as ex:
            self.show_exception(ex)

edit_wikison(page_titles, entity_type_name, property_name, value, force=False)

Edit the WikiSON for on the given pages Args: page_titles: a list of page titles to be edited entity_type_name: name of the WikiSON entity type property_name: name of the property to edit value: value to set. If None property is deleted from the WikiSON force: If False only print the changes. Otherwise, apply the changes

Source code in wikibot3rd/wikipush.py
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
def edit_wikison(
    self,
    page_titles: typing.List[str],
    entity_type_name: str,
    property_name: str,
    value: typing.Any,
    force: bool = False,
):
    """
    Edit the WikiSON for on the given pages
    Args:
        page_titles: a list of page titles to be edited
        entity_type_name: name of the WikiSON entity type
        property_name: name of the property to edit
        value: value to set. If None property is deleted from the WikiSON
        force: If False only print the changes. Otherwise, apply the changes
    """
    total = len(page_titles)
    self.log(
        f"""editing {total} pages in {self.toWikiId} ({"forced" if force else "dry run"})"""
    )
    for i, page_title in enumerate(page_titles, 1):
        try:
            self.log(
                f"{i}/{total} ({i/total*100:.2f}%): editing {page_title} ...",
                end="",
            )
            page_to_be_edited = self.toWiki.getPage(page_title)
            if not force and not page_to_be_edited.exists:
                self.log("👎")
            else:
                comment = "edited by wikiedit"
                markup = page_to_be_edited.text()
                wikison = WikiSON(page_title, markup)
                new_markup = wikison.set(
                    entity_type_name=entity_type_name, record={property_name: value}
                )
                if new_markup != markup:
                    if force:
                        page_to_be_edited.edit(new_markup, comment)
                        self.log("✅")
                    else:
                        diff_str = self.getDiff(markup, new_markup, n=3)
                        self.log(f"👍{diff_str}")
                else:
                    self.log("↔")
        except Exception as ex:
            self.show_exception(ex)

ensureDirectoryExists(directory)

make sure the given directory exists

Parameters:

Name Type Description Default
directory(str)

the directory to check for existence

required
Source code in wikibot3rd/wikipush.py
583
584
585
586
587
588
589
590
def ensureDirectoryExists(self, directory: str):
    """
    make sure the given directory exists

    Args:
        directory(str): the directory to check for existence
    """
    Path(directory).mkdir(parents=True, exist_ok=True)

ensureParentDirectoryExists(filePath)

for pages that have a "/" in the name make sure that the parent Directory exists

Parameters:

Name Type Description Default
filePath(str)

the filePath to check

required
Source code in wikibot3rd/wikipush.py
573
574
575
576
577
578
579
580
581
def ensureParentDirectoryExists(self, filePath: str):
    """
    for pages that have a "/" in the name make sure that the parent Directory exists

    Args:
        filePath(str): the filePath to check
    """
    directory = os.path.dirname(filePath)
    self.ensureDirectoryExists(directory)

formatQueryResult(askQuery, wiki=None, limit=None, showProgress=False, queryDivision=1, outputFormat='lod', entityName='data', title=None)

format the query result for the given askQuery. Args: askQuery(string): Semantic Media Wiki in line query https://www.semantic-mediawiki.org/wiki/Help:Inline_queries wiki(wikibot3rd): the wiki to query - use fromWiki if not specified limit(int): the limit for the query (optional) showProgress(bool): true if progress of the query retrieval should be indicated (default: one dot per 50 records ...) queryDivision(int): Defines the number of subintervals the query is divided into (must be greater equal 1) outputFormat(str): output format of the query results - default format is lod entityName(str): the name of the entity title(str): the title of the query (if any) Returns: Query results in the requested outputFormat as string. If the requested outputFormat is not supported None is returned.

Source code in wikibot3rd/wikipush.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
def formatQueryResult(
    self,
    askQuery,
    wiki=None,
    limit=None,
    showProgress=False,
    queryDivision=1,
    outputFormat="lod",
    entityName="data",
    title: str = None,
):
    """
    format the query result for the given askQuery.
    Args:
         askQuery(string): Semantic Media Wiki in line query https://www.semantic-mediawiki.org/wiki/Help:Inline_queries
        wiki(wikibot3rd): the wiki to query - use fromWiki if not specified
        limit(int): the limit for the query (optional)
        showProgress(bool): true if progress of the query retrieval should be indicated (default: one dot per 50 records ...)
        queryDivision(int): Defines the number of subintervals the query is divided into (must be greater equal 1)
        outputFormat(str): output format of the query results - default format is lod
        entityName(str): the name of the entity
        title(str): the title of the query (if any)
    Returns:
        Query results in the requested outputFormat as string.
        If the requested outputFormat is not supported None is returned.
    """
    pageRecords = self.queryPages(
        askQuery, wiki, limit, showProgress, queryDivision
    )
    outputFormat = outputFormat.lower()
    if outputFormat == "csv":
        return self.convertToCSV(pageRecords)
    elif outputFormat == "json":
        res = []
        for page in pageRecords.values():
            res.append(page)
        res_json = json.dumps({entityName: res}, default=str, indent=3)
        return res_json
    elif outputFormat == "lod":
        return [pageRecord for pageRecord in pageRecords.values()]
    else:
        if title is None:
            title = entityName
        query = Query(name=entityName, query=askQuery, title=title)
        qlod = [pageRecord for pageRecord in pageRecords.values()]
        doc = query.documentQueryResult(
            qlod, limit, tablefmt=outputFormat, withSourceCode=False
        )
        return doc

getDiff(text, newText, n=1, forHuman=True) staticmethod

Compare the two given strings and return the differences Args: text: old text to compare the new text to newText: new text n: The number of context lines forHuman: If True update the diff string to be better human-readable

Returns:

Name Type Description
str str

difference string

Source code in wikibot3rd/wikipush.py
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
@staticmethod
def getDiff(text: str, newText: str, n: int = 1, forHuman: bool = True) -> str:
    """
    Compare the two given strings and return the differences
    Args:
        text: old text to compare the new text to
        newText: new text
        n: The number of context lines
        forHuman: If True update the diff string to be better human-readable

    Returns:
        str: difference string
    """
    # if WikiPush.differ is None:
    #    WikiPush.differ=Differ()
    # https://docs.python.org/3/library/difflib.html
    #  difflib.unified_diff(a, b, fromfile='', tofile='', fromfiledate='', tofiledate='', n=3, lineterm='\n')¶
    # diffs=WikiPush.differ.compare(,)
    textLines = text.split("\n")
    newTextLines = newText.split("\n")
    diffs = difflib.unified_diff(textLines, newTextLines, n=n)
    if forHuman:
        hdiffs = []
        for line in diffs:
            unwantedItems = ["@@", "---", "+++"]
            keep = True
            for unwanted in unwantedItems:
                if unwanted in line:
                    keep = False
            if keep:
                hdiffs.append(line)
    else:
        hdiffs = diffs
    diffStr = "\n".join(hdiffs)
    return diffStr

getDownloadPath()

get the download path

Source code in wikibot3rd/wikipush.py
600
601
602
603
604
def getDownloadPath(self):
    """
    get the download path
    """
    return self.getHomePath("Downloads/mediawiki")

getHomePath(localPath)

get the given home path

Source code in wikibot3rd/wikipush.py
592
593
594
595
596
597
598
def getHomePath(self, localPath):
    """
    get the given home path
    """
    homePath = f"{Path.home()}/{localPath}"
    self.ensureDirectoryExists(homePath)
    return homePath

getModify(search, replace, debug=False) staticmethod

get the modification function

Parameters:

Name Type Description Default
search(str)

the search string

required
replace(str)

the replace string

required
debug(bool)

if debug show

required

Returns:

Type Description
Callable[[str], str]

String modify function that takes as input the string, applies the search and replace action and returns the modified string

Source code in wikibot3rd/wikipush.py
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
@staticmethod
def getModify(
    search: str, replace: str, debug: bool = False
) -> typing.Callable[[str], str]:
    """
    get the modification function

    Args:
        search(str): the search string
        replace(str): the replace string
        debug(bool): if debug show

    Returns:
        String modify function that takes as input the string, applies the search and replace action
         and returns the modified string
    """
    if debug:
        print(f"search regex: {search}")
        print(f"replace regex: {replace}")
    searchRegex = r"%s" % search
    replaceRegex = r"%s" % replace
    modify = lambda text: re.sub(searchRegex, replaceRegex, text)
    return modify

handleAPIWarnings(warnings, ignoreExists=False)

handle API Warnings

Parameters:

Name Type Description Default
warnings(list)

a list of API warnings

required
ignoreExists(bool)

ignore messages that warn about existing content

required

Returns:

Name Type Description
bool

True if the exception was handled as ok False if it was logged as an error

Source code in wikibot3rd/wikipush.py
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
def handleAPIWarnings(self, warnings, ignoreExists=False):
    """
    handle API Warnings

    Args:
        warnings(list): a list of API warnings
        ignoreExists(bool): ignore messages that warn about existing content

    Returns:
        bool: True if the exception was handled as ok False if it was logged as an error
    """
    msg = ""
    if warnings:
        if isinstance(warnings, str):
            msg = warnings
        else:
            for warning in warnings:
                msg += "%s\n" % str(warning)
    return self.handleWarning(msg, ignoreExists=ignoreExists)

handleException(ex, ignoreExists=False)

handle the given exception and ignore it if it includes "exists" and ignoreExists is True

Parameters:

Name Type Description Default
ex(Exception)

the exception to handle

required
ignoreExists(bool)

True if "exists" should be ignored

required

Returns:

Name Type Description
bool

True if the exception was handled as ok False if it was logged as an error

Source code in wikibot3rd/wikipush.py
642
643
644
645
646
647
648
649
650
651
652
653
654
def handleException(self, ex, ignoreExists=False):
    """
    handle the given exception and ignore it if it includes "exists" and ignoreExists is True

    Args:
        ex(Exception): the exception to handle
        ignoreExists(bool): True if "exists" should be ignored

    Returns:
        bool: True if the exception was handled as ok False if it was logged as an error
    """
    msg = str(ex)
    return self.handleWarning(msg, marker="❌", ignoreExists=ignoreExists)

handleWarning(msg, marker='⚠️', ignoreExists=False)

handle the given warning and ignore it if it includes "exists" and ignoreExists is True

Parameters:

Name Type Description Default
msg(string)

the warning to handle

required
marker(string)

the marker to use for the message

required
ignoreExists(bool)

True if "exists" should be ignored

required

Returns:

Name Type Description
bool

True if the exception was handled as ok False if it was logged as an error

Source code in wikibot3rd/wikipush.py
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
def handleWarning(self, msg, marker="⚠️", ignoreExists=False):
    """
    handle the given warning and ignore it if it includes "exists" and ignoreExists is True

    Args:
        msg(string): the warning to handle
        marker(string): the marker to use for the message
        ignoreExists(bool): True if "exists" should be ignored

    Returns:
        bool: True if the exception was handled as ok False if it was logged as an error
    """
    # print ("handling warning %s with ignoreExists=%r" % (msg,ignoreExists))
    if ignoreExists and "exists" in msg:
        # shorten exact duplicate message
        if "exact duplicate" in msg:
            msg = "exact duplicate"
        marker = "👀"
    if not ignoreExists and "exists" in msg:
        msg = (
            "file exists (to overwrite existing files enable the ignore parameter)"
        )
    self.log("%s:%s" % (marker, msg))
    return marker == "👀"

log(msg, end='\n')

show the given message if verbose is on

Parameters:

Name Type Description Default
msg(str)

the message to display

required
Source code in wikibot3rd/wikipush.py
77
78
79
80
81
82
83
84
85
def log(self, msg: str, end="\n"):
    """
    show the given message if verbose is on

    Args:
        msg(str): the message to display
    """
    if self.verbose:
        print(msg, end=end)

nuke(pageTitles, force=False)

delete the pages with the given page Titles

Parameters:

Name Type Description Default
pageTitles(list)

a list of page titles to be transfered from the formWiki to the toWiki

required
force(bool)

True if pages should be actually deleted - dry run only listing pages is default

required
Source code in wikibot3rd/wikipush.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
def nuke(self, pageTitles, force=False):
    """
    delete the pages with the given page Titles

    Args:
        pageTitles(list): a list of page titles to be transfered from the formWiki to the toWiki
        force(bool): True if pages should be actually deleted - dry run only listing pages is default
    """
    total = len(pageTitles)
    self.log(
        "deleting %d pages in %s (%s)"
        % (total, self.toWikiId, "forced" if force else "dry run")
    )
    for i, pageTitle in enumerate(pageTitles):
        try:
            self.log(
                "%d/%d (%4.0f%%): deleting %s ..."
                % (i + 1, total, (i + 1) / total * 100, pageTitle),
                end="",
            )
            pageToBeDeleted = self.toWiki.getPage(pageTitle)
            if not force:
                self.log("👍" if pageToBeDeleted.exists else "👎")
            else:
                pageToBeDeleted.delete("deleted by wiknuke")
                self.log("✅")
        except Exception as ex:
            self.show_exception(ex)

push(pageTitles, force=False, ignore=False, withImages=False)

push the given page titles

Parameters:

Name Type Description Default
pageTitles(list)

a list of page titles to be transfered from the formWiki to the toWiki

required
force(bool)

True if pages should be overwritten if they exist

required
ignore(bool)

True if warning for images should be ignored (e.g if they exist)

required
withImages(bool)

True if the image on a page should also be copied

required

Returns: list: a list of pageTitles for which the activity failed

Source code in wikibot3rd/wikipush.py
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
def push(self, pageTitles, force=False, ignore=False, withImages=False) -> list:
    """
    push the given page titles

    Args:
        pageTitles(list): a list of page titles to be transfered from the formWiki to the toWiki
        force(bool): True if pages should be overwritten if they exist
        ignore(bool): True if warning for images should be ignored (e.g if they exist)
        withImages(bool): True if the image on a page should also be copied
    Returns:
        list: a list of pageTitles for which the activity failed
    """
    comment = f"pushed from {self.fromWikiId} by wikipush"
    return self.work(
        pageTitles,
        activity="copying",
        comment=comment,
        force=force,
        ignore=ignore,
        withImages=withImages,
    )

pushImages(imageList, delim='', ignore=False)

push the images in the given image List

Parameters:

Name Type Description Default
imageList(list)

a list of images to be pushed

required
ignore(bool)

True to upload despite any warnings.

required
Source code in wikibot3rd/wikipush.py
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
def pushImages(self, imageList, delim="", ignore=False):
    """
    push the images in the given image List

    Args:
        imageList(list): a list of images to be pushed
        ignore(bool): True to upload despite any warnings.
    """
    for image in imageList:
        try:
            self.log("%scopying image %s ..." % (delim, image.name), end="")
            imagePath, filename = self.downloadImage(image)
            description = image.imageinfo["comment"]
            try:
                self.uploadImage(imagePath, filename, description, ignore)
                self.log("✅")
            except Exception as ex:
                self.handleAPIWarnings(ex.args[0], ignoreExists=ignore)
                if self.debug:
                    self.show_exception(ex)
            if self.debug:
                print(image.imageinfo)
        except Exception as ex:
            self.handleException(ex, ignore)

query(askQuery, wiki=None, queryField=None, limit=None, showProgress=False, queryDivision=1)

query the given wiki for pages matching the given askQuery

Parameters:

Name Type Description Default
askQuery(string)

Semantic Media Wiki in line query https://www.semantic-mediawiki.org/wiki/Help:Inline_queries

required
wiki(wikibot3rd)

the wiki to query - use fromWiki if not specified

required
queryField(string)

the field to select the pageTitle from

required
limit(int)

the limit for the query (optional)

required
showProgress(bool)

true if progress of the query retrieval should be indicated (default: one dot per 50 records ...)

required

Returns: list: a list of pageTitles matching the given askQuery

Source code in wikibot3rd/wikipush.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
def query(
    self,
    askQuery,
    wiki=None,
    queryField=None,
    limit=None,
    showProgress=False,
    queryDivision=1,
):
    """
    query the given wiki for pages matching the given askQuery

    Args:
        askQuery(string): Semantic Media Wiki in line query https://www.semantic-mediawiki.org/wiki/Help:Inline_queries
        wiki(wikibot3rd): the wiki to query - use fromWiki if not specified
        queryField(string): the field to select the pageTitle from
        limit(int): the limit for the query (optional)
        showProgress(bool): true if progress of the query retrieval should be indicated (default: one dot per 50 records ...)
    Returns:
        list: a list of pageTitles matching the given askQuery
    """
    pageRecords = self.queryPages(
        askQuery, wiki, limit, showProgress, queryDivision
    )
    if queryField is None:
        return pageRecords.keys()
    # use a Dict to remove duplicates
    pagesDict = {}
    for pageRecord in pageRecords.values():
        if queryField in pageRecord:
            pagesDict[pageRecord[queryField]] = True
    return list(pagesDict.keys())

queryPages(askQuery, wiki=None, limit=None, showProgress=False, queryDivision=1)

query the given wiki for pagerecords matching the given askQuery

Parameters:

Name Type Description Default
askQuery(string)

Semantic Media Wiki in line query https://www.semantic-mediawiki.org/wiki/Help:Inline_queries

required
wiki(wikibot3rd)

the wiki to query - use fromWiki if not specified

required
limit(int)

the limit for the query (optional)

required
showProgress(bool)

true if progress of the query retrieval should be indicated (default: one dot per 50 records ...)

required
queryDivision(int)

Defines the number of subintervals the query is divided into (must be greater equal 1)

required

Returns: list: a list of pageRecords matching the given askQuery

Source code in wikibot3rd/wikipush.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def queryPages(
    self, askQuery, wiki=None, limit=None, showProgress=False, queryDivision=1
) -> dict:
    """
    query the given wiki for pagerecords matching the given askQuery

    Args:
        askQuery(string): Semantic Media Wiki in line query https://www.semantic-mediawiki.org/wiki/Help:Inline_queries
        wiki(wikibot3rd): the wiki to query - use fromWiki if not specified
        limit(int): the limit for the query (optional)
        showProgress(bool): true if progress of the query retrieval should be indicated (default: one dot per 50 records ...)
        queryDivision(int): Defines the number of subintervals the query is divided into (must be greater equal 1)
    Returns:
        list: a list of pageRecords matching the given askQuery
    """
    if wiki is None:
        wiki = self.fromWiki
    smwClient = SMWClient(
        wiki.getSite(),
        showProgress=showProgress,
        queryDivision=queryDivision,
        debug=self.debug,
    )
    pageRecords = smwClient.query(askQuery, limit=limit)
    return pageRecords

restore(pageTitles=None, backupPath=None, listFile=None, stdIn=False)

restore given page titles from local backup If no page titles are given the whole backup is restored.

Parameters:

Name Type Description Default
pageTitles(list)

a list of pageTitles to be restored to toWiki. If None -> full restore of backup

required
backupPath(str)

path to backup location

required
listFile
None
stdIn
False
Source code in wikibot3rd/wikipush.py
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
def restore(self, pageTitles=None, backupPath=None, listFile=None, stdIn=False):
    """
    restore given page titles from local backup
    If no page titles are given the whole backup is restored.

    Args:
        pageTitles(list): a list of pageTitles to be restored to toWiki. If None -> full restore of backup
        backupPath(str): path to backup location
        listFile:
        stdIn:
    """
    if stdIn:
        backupPath = os.path.dirname(pageTitles[0].strip())
        pageTitlesfix = []
        for i in pageTitles:
            pageTitlesfix.append(os.path.basename(i.strip().replace(".wiki", "")))
        pageTitles = pageTitlesfix
    elif listFile is not None:
        f = open(listFile, "r")
        allx = f.readlines()
        pageTitles = []
        for i in allx:
            pageTitles.append(os.path.basename(i.strip()).replace(".wiki", ""))
    else:
        if backupPath is None:
            backupPath = self.getHomePath(f"wikibackup/{self.toWikiId}")
        if pageTitles is None:
            pageTitles = []
            for path, subdirs, files in os.walk(backupPath):
                for name in files:
                    filename = os.path.join(path, name)[len(backupPath) + 1 :]
                    if filename.endswith(".wiki"):
                        pageTitles.append(filename[: -len(".wiki")])
    total = len(pageTitles)
    self.log(
        "restoring %d pages from %s to %s" % (total, backupPath, self.toWikiId)
    )
    for i, pageTitle in enumerate(pageTitles):
        try:
            self.log(
                "%d/%d (%4.0f%%): restore %s ..."
                % (i + 1, total, (i + 1) / total * 100, pageTitle),
                end="",
            )
            wikiFilePath = f"{backupPath}/{pageTitle}.wiki"
            with open(wikiFilePath, mode="r") as wikiFile:
                page_content = wikiFile.read()
                page = self.toWiki.getPage(pageTitle)
                page.edit(
                    page_content,
                    f"modified through wikirestore by {self.toWiki.wikiUser.user}",
                )
            self.log("✅")
        except Exception as ex:
            self.show_exception(ex)

show_exception(ex)

Show the given exception and, if debug mode is on, show the traceback.

Source code in wikibot3rd/wikipush.py
631
632
633
634
635
636
637
638
639
640
def show_exception(self, ex: Exception):
    """
    Show the given exception and, if debug mode is on, show the traceback.
    """
    msg = f"❌: {str(ex)}"
    if self.debug:
        # Append the formatted traceback to the message
        msg += "\n" + traceback.format_exc()

    self.log(msg)

upload(files, force=False)

push the given files Args: files(list): a list of filenames to be transfered to the toWiki force(bool): True if images should be overwritten if they exist

Source code in wikibot3rd/wikipush.py
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
def upload(self, files, force=False):
    """
    push the given files
    Args:
        files(list): a list of filenames to be transfered to the toWiki
        force(bool): True if images should be overwritten if they exist
    """
    total = len(files)
    self.log("uploading %d files to %s" % (total, self.toWikiId))
    for i, file in enumerate(files):
        try:
            self.log(
                "%d/%d (%4.0f%%): uploading %s ..."
                % (i + 1, total, (i + 1) / total * 100, file),
                end="",
            )
            description = "uploaded by wikiupload"
            filename = os.path.basename(file)
            self.uploadImage(file, filename, description, force)
            self.log("✅")
        except Exception as ex:
            self.show_exception(ex)

uploadImage(imagePath, filename, description, ignoreExists=False)

upload an image

Parameters:

Name Type Description Default
imagePath(str)

the path to the image

required
filename(str)

the filename to use

required
description(str)

the description to use

required
ignoreExists(bool)

True if it should be ignored if the image exists

required
Source code in wikibot3rd/wikipush.py
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
def uploadImage(self, imagePath, filename, description, ignoreExists=False):
    """
    upload an image

    Args:
        imagePath(str): the path to the image
        filename(str): the filename to use
        description(str): the description to use
        ignoreExists(bool): True if it should be ignored if the image exists
    """
    with open(imagePath, "rb") as imageFile:
        warnings = None
        response = self.toWiki.site.upload(
            imageFile, filename, description, ignoreExists
        )
        if "warnings" in response:
            warnings = response["warnings"]
        if "upload" in response and "warnings" in response["upload"]:
            warningsDict = response["upload"]["warnings"]
            warnings = []
            for item in warningsDict.items():
                warnings.append(str(item))
        if warnings:
            raise Exception(warnings)

work(pageTitles, activity='copying', comment='pushed', force=False, ignore=False, withImages=False)

work on the given page titles

Parameters:

Name Type Description Default
pageTitles(list)

a list of page titles to be transfered from the formWiki to the toWiki

required
activity(str)

the activity to perform

required
comment(str)

the comment to display

required
force(bool)

True if pages should be overwritten if they exist

required
ignore(bool)

True if warning for images should be ignored (e.g if they exist)

required
withImages(bool)

True if the image on a page should also be copied

required

Returns: list: a list of pageTitles for which the activity failed

Source code in wikibot3rd/wikipush.py
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
def work(
    self,
    pageTitles: list,
    activity: str = "copying",
    comment: str = "pushed",
    force: bool = False,
    ignore: bool = False,
    withImages: bool = False,
) -> list:
    """
    work on the given page titles

    Args:
        pageTitles(list): a list of page titles to be transfered from the formWiki to the toWiki
        activity(str): the activity to perform
        comment(str): the comment to display
        force(bool): True if pages should be overwritten if they exist
        ignore(bool): True if warning for images should be ignored (e.g if they exist)
        withImages(bool): True if the image on a page should also be copied
    Returns:
        list: a list of pageTitles for which the activity failed
    """
    failed = []
    total = len(pageTitles)
    self.log(f"{activity} {total} pages from {self.fromWikiId} to {self.toWikiId}")
    for i, pageTitle in enumerate(pageTitles):
        try:
            percent = (i + 1) / total * 100
            self.log(
                f"{i+1}/{total} ({percent:4.0f}%): {activity} ... {pageTitle}",
                end="",
            )
            page = self.fromWiki.getPage(pageTitle)
            if page.exists:
                # is this an image?
                if isinstance(page, Image):
                    self.pushImages([page], ignore=ignore)
                else:
                    newPage = self.toWiki.getPage(pageTitle)
                    if not newPage.exists or force:
                        try:
                            newPage.edit(page.text(), comment)
                            self.log("✅")
                            pageOk = True
                        except Exception as ex:
                            pageOk = self.handleException(ex, ignore)
                            if not pageOk:
                                failed.append(pageTitle)
                        if withImages and pageOk:
                            self.pushImages(page.images(), ignore=ignore)
                    else:
                        self.log("👎")
            else:
                self.log("❌")
                failed.append(pageTitle)
        except Exception as ex:
            self.show_exception(ex)
            failed.append(pageTitle)
    return failed

main(argv=None, mode='wikipush')

main program.

Source code in wikibot3rd/wikipush.py
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
def main(argv=None, mode="wikipush"):  # IGNORE:C0111
    """main program."""

    if argv is None:
        argv = sys.argv[1:]

    program_name = mode
    program_version = "v%s" % __version__
    program_build_date = str(__updated__)
    program_version_message = "%%(prog)s %s (%s)" % (
        program_version,
        program_build_date,
    )
    program_shortdesc = "wikipush"
    user_name = "Wolfgang Fahl"

    program_license = """%s

  Created by %s on %s.
  Copyright 2020-2024 Wolfgang Fahl. All rights reserved.

  Licensed under the Apache License 2.0
  http://www.apache.org/licenses/LICENSE-2.0

  Distributed on an "AS IS" basis without warranties
  or conditions of any kind, either express or implied.

""" % (
        program_shortdesc,
        user_name,
        str(__date__),
    )

    try:
        # Setup argument parser
        parser = ArgumentParser(
            description=program_license, formatter_class=RawDescriptionHelpFormatter
        )
        parser.add_argument(
            "-d",
            "--debug",
            dest="debug",
            action="store_true",
            help="set debug level [default: %(default)s]",
        )
        parser.add_argument(
            "-V", "--version", action="version", version=program_version_message
        )
        if mode == "wikipush":
            parser.add_argument(
                "-l",
                "--login",
                dest="login",
                action="store_true",
                help="login to source wiki for access permission",
            )
            parser.add_argument(
                "-s", "--source", dest="source", help="source wiki id", required=True
            )
            parser.add_argument(
                "-f",
                "--force",
                dest="force",
                action="store_true",
                help="force to overwrite existing pages",
            )
            parser.add_argument(
                "-i",
                "--ignore",
                dest="ignore",
                action="store_true",
                help="ignore upload warnings e.g. duplicate images",
            )
            parser.add_argument(
                "-wi",
                "--withImages",
                dest="withImages",
                action="store_true",
                help="copy images on the given pages",
            )
        elif mode == "wikibackup":
            parser.add_argument(
                "-g",
                "--git",
                dest="git",
                action="store_true",
                help="use git for version control",
            )
            parser.add_argument(
                "-l",
                "--login",
                dest="login",
                action="store_true",
                help="login to source wiki for access permission",
            )
            parser.add_argument(
                "-s", "--source", dest="source", help="source wiki id", required=True
            )
            parser.add_argument(
                "-wi",
                "--withImages",
                dest="withImages",
                action="store_true",
                help="copy images on the given pages",
            )
            parser.add_argument(
                "--backupPath",
                dest="backupPath",
                help="path where the backup should be stored",
                required=False,
            )
        elif mode == "wikinuke":
            parser.add_argument(
                "-f",
                "--force",
                dest="force",
                action="store_true",
                help="force to delete pages - default is 'dry' run only listing pages",
            )
        elif mode == "wikiedit":
            parser.add_argument(
                "--search", dest="search", help="search pattern", required=False
            )
            parser.add_argument(
                "--replace", dest="replace", help="replace pattern", required=False
            )
            parser.add_argument(
                "--context",
                dest="context",
                type=int,
                help="number of context lines to show in dry run diff display",
                default=1,
            )
            parser.add_argument(
                "-f",
                "--force",
                dest="force",
                action="store_true",
                help="force to edit pages - default is 'dry' run only listing pages",
            )
            parser.add_argument(
                "--template",
                dest="template",
                help="Name of the template to edit",
                required=False,
            )
            parser.add_argument(
                "--property",
                dest="property",
                help="Name of the property in the template to edit",
                required=False,
            )
            parser.add_argument(
                "--value",
                dest="value",
                help="Value of the Property. If not set but property name is given the property is removed from the template",
                required=False,
            )
        elif mode == "wikiquery":
            parser.add_argument(
                "-l",
                "--login",
                dest="login",
                action="store_true",
                help="login to source wiki for access permission",
            )
            parser.add_argument(
                "-s", "--source", dest="source", help="source wiki id", required=True
            )
            parser.add_argument(
                "--format",
                dest="format",
                default="json",
                help="format to use for query result csv,json,lod or any of the tablefmt options of https://pypi.org/project/tabulate/",
            )
            parser.add_argument(
                "--entityName",
                dest="entityName",
                default="data",
                help="name of the entites that are queried - only needed for some output formats - default is 'data'",
            )
        elif mode == "wikiupload":
            parser.add_argument(
                "--files", nargs="+", help="list of files to be uploaded", required=True
            )
            parser.add_argument(
                "-f",
                "--force",
                dest="force",
                action="store_true",
                help="force to (re)upload existing files - default is false",
            )
            pass
        elif mode == "wikirestore":
            parser.add_argument(
                "--listFile",
                dest="listFile",
                help="List of pages to restore",
                required=False,
            )
            parser.add_argument(
                "--backupPath",
                dest="backupPath",
                help="path the backup is stored",
                required=False,
            )
            parser.add_argument(
                "-s", "--source", dest="source", help="source wiki id", required=False
            )
            parser.add_argument(
                "-l",
                "--login",
                dest="login",
                action="store_true",
                help="login to source wiki for access permission",
            )
            parser.add_argument(
                "-stdinp",
                dest="stdinp",
                action="store_true",
                help="Use the input from STD IN using pipes",
            )
        if mode in [
            "wikipush",
            "wikiedit",
            "wikinuke",
            "wikibackup",
            "wikiquery",
            "wikirestore",
        ]:
            parser.add_argument(
                "--limit", dest="limit", type=int, help="limit for query"
            )
            parser.add_argument(
                "--progress",
                dest="showProgress",
                action="store_true",
                help="shows progress for query",
            )
            parser.add_argument(
                "-q",
                "--query",
                dest="query",
                help="select pages with given SMW ask query",
                required=False,
            )
            parser.add_argument(
                "--queryFile",
                dest="queryFile",
                help="file the query should be read from",
            )
            parser.add_argument(
                "-qf",
                "--queryField",
                dest="queryField",
                help="query result field which contains page",
            )
            parser.add_argument(
                "-p",
                "--pages",
                nargs="+",
                help="list of page Titles to be pushed",
                required=False,
            )
            parser.add_argument(
                "-ui",
                "--withGUI",
                dest="ui",
                help="Pop up GUI for selection",
                action="store_true",
                required=False,
            )
            parser.add_argument(
                "-qd",
                "--queryDivision",
                default=1,
                dest="queryDivision",
                type=int,
                help="divide query into equidistant subintervals to limit the result size of the individual queries",
                required=False,
            )
        if mode in ["wikiquery"]:
            parser.add_argument("--title", help="the title for the query")
        if not mode in ["wikibackup", "wikiquery"]:
            parser.add_argument(
                "-t", "--target", dest="target", help="target wiki id", required=True
            )
        # Process arguments
        args = parser.parse_args(argv)
        if hasattr(args, "queryDivision"):
            if args.queryDivision < 1:
                raise ValueError("queryDivision argument must be greater equal 1")

        if mode == "wikipush":
            wikipush = WikiPush(
                args.source, args.target, login=args.login, debug=args.debug
            )
            queryWiki = wikipush.fromWiki
        elif mode == "wikibackup":
            wikipush = WikiPush(args.source, None, login=args.login, debug=args.debug)
            queryWiki = wikipush.fromWiki
        elif mode == "wikiquery":
            wikipush = WikiPush(args.source, None, login=args.login, debug=args.debug)
            queryWiki = wikipush.fromWiki
        elif mode == "wikiupload":
            wikipush = WikiPush(None, args.target, debug=args.debug)
        elif mode == "wikirestore":
            wikipush = WikiPush(
                args.source, args.target, login=args.login, debug=args.debug
            )
            queryWiki = wikipush.fromWiki
        else:
            wikipush = WikiPush(None, args.target, debug=args.debug)
            queryWiki = wikipush.toWiki
        if mode == "wikiupload":
            wikipush.upload(args.files, args.force)
        else:
            pages = None
            if args.pages:
                pages = args.pages
            elif hasattr(args, "stdinp"):
                if args.stdinp:
                    pages = sys.stdin.readlines()
            elif args.query or args.queryFile:
                if args.query:
                    query = args.query
                else:
                    with open(args.queryFile, "r") as queryFile:
                        query = queryFile.read()
                if mode == "wikiquery":
                    formatedQueryResults = wikipush.formatQueryResult(
                        query,
                        wiki=queryWiki,
                        limit=args.limit,
                        showProgress=args.showProgress,
                        queryDivision=args.queryDivision,
                        outputFormat=args.format,
                        entityName=args.entityName,
                        title=args.title,
                    )
                    if formatedQueryResults:
                        print(formatedQueryResults)
                    else:
                        print(f"Format {args.format} is not supported.")
                else:
                    pages = wikipush.query(
                        query,
                        wiki=queryWiki,
                        queryField=args.queryField,
                        limit=args.limit,
                        showProgress=args.showProgress,
                        queryDivision=args.queryDivision,
                    )
            if pages is None:
                if mode == "wikiquery":
                    # we are finished
                    pass
                elif mode == "wikirestore":
                    if (
                        args.pages is None
                        and args.queryFile is None
                        and args.query is None
                    ):
                        wikipush.restore(
                            pageTitles=None,
                            backupPath=args.backupPath,
                            listFile=args.listFile,
                        )
                else:
                    raise Exception(
                        "no pages specified - you might want to use the -p, -q or --queryFile option"
                    )
            else:
                if args.ui and len(pages) > 0:
                    pages = Selector.select(
                        pages,
                        action=mode.lower().lstrip("wiki")[0].upper()
                        + mode.lstrip("wiki")[1:],
                        description="GUI program for the mode " + mode,
                        title=mode,
                    )
                    if pages == "Q":  # If GUI window is closed, end the program
                        sys.exit(0)
                if mode == "wikipush":
                    wikipush.push(
                        pages,
                        force=args.force,
                        ignore=args.ignore,
                        withImages=args.withImages,
                    )
                elif mode == "wikibackup":
                    wikipush.backup(
                        pages,
                        git=args.git,
                        withImages=args.withImages,
                        backupPath=args.backupPath,
                    )
                elif mode == "wikinuke":
                    wikipush.nuke(pages, force=args.force)
                elif mode == "wikiedit":
                    # two modes search&replace and WikiSON property edit
                    if args.search or args.replace:
                        # search&replace
                        if args.search and args.replace:
                            modify = WikiPush.getModify(
                                args.search, args.replace, args.debug
                            )
                            wikipush.edit(
                                pages,
                                modify=modify,
                                context=args.context,
                                force=args.force,
                            )
                        else:
                            raise Exception(
                                "In wikiedit search&replace mode both args '--search' and '--replace' need to be set"
                            )
                    else:
                        # WikiSON property edit
                        if len(pages) > 0 and args.template and args.property:
                            wikipush.edit_wikison(
                                page_titles=pages,
                                entity_type_name=args.template,
                                property_name=args.property,
                                value=args.value,
                                force=args.force,
                            )
                        else:
                            raise Exception(
                                "In wikiedit WikiSON edit mode '--pages', '--template' and '--property' need to be defined ('--value' is optional see '--help')"
                            )

                elif mode == "wikirestore":
                    wikipush.restore(
                        pages, backupPath=args.backupPath, stdIn=args.stdinp
                    )
                else:
                    raise Exception("undefined wikipush mode %s" % mode)

    except KeyboardInterrupt:
        ### handle keyboard interrupt ###
        return 1
    except Exception as e:
        if DEBUG:
            raise (e)
        indent = len(program_name) * " "
        sys.stderr.write(program_name + ": " + repr(e) + "\n")
        sys.stderr.write(indent + "  for help use --help")
        if args.debug:
            print(traceback.format_exc())
        return 2

wikiquery

Created on 2024-04-18

@author: wf

wikirestore

Created on 2021-02-16

@author: wf

wikitext

Created on 2023-02-24 @author: tholzheim

WikiMarkup

Provides methods to modify, query and update Templates in wiki markup see https://en.wikipedia.org/wiki/Help:Wikitext

Source code in wikibot3rd/wikitext.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
class WikiMarkup:
    """
    Provides methods to modify, query and update Templates in wiki markup
    see https://en.wikipedia.org/wiki/Help:Wikitext
    """

    def __init__(self, page_title: str, wiki_markup: str = None, debug: bool = False):
        """

        Args:
            page_title: page title of the wiki_markup file
            wiki_markup: WikiPage content as string. If None tries to init the wiki_markup from source location
        """
        self.page_title = page_title
        self.debug = debug
        self._wiki_markup = wiki_markup
        self._parsed_wiki_markup: typing.Optional[wtp.WikiText] = None

    @property
    def wiki_markup(self) -> str:
        """
        Get wiki markup. If wikimarkup was parsed the parsed markup is converted back to wiki markup

        Returns:
            str
        """
        if self._parsed_wiki_markup is None:
            return self._wiki_markup
        else:
            return str(self._parsed_wiki_markup)

    @wiki_markup.setter
    def wiki_markup(self, wiki_markup: str):
        self._wiki_markup = wiki_markup
        if self._parsed_wiki_markup is not None:
            # update parsed wiki_markup
            self._parsed_wiki_markup = wtp.parse(wiki_markup)

    @property
    def parsed_wiki_markup(self) -> wtp.WikiText:
        """
        Get WikiText. If not already parsed the markup is parsed

        Returns:
            wtp:WikiText
        """
        if self._parsed_wiki_markup is None and self._wiki_markup is not None:
            self._parsed_wiki_markup = wtp.parse(self._wiki_markup)
        return self._parsed_wiki_markup

    @parsed_wiki_markup.setter
    def parsed_wiki_markup(self, parsed_wiki_markup: wtp.WikiText):
        self._parsed_wiki_markup = parsed_wiki_markup

    def _get_templates_by_name(
        self, template_name: str, match: dict = typing.Dict[str, str]
    ) -> typing.List[Template]:
        """
        Returns the templates matching the given name and if additional matches are defined the values of the template
        also have to match these
        Args:
            template_name: name of the template
            match(dict): Additional matching criteria

        Returns:
            list of templates with the given name matching the given criteria
        """
        if match is None:
            match = {}
        if self.parsed_wiki_markup is None or self.parsed_wiki_markup.templates is None:
            # markup has no templates
            return []
        target_template_name = template_name.strip()
        matching_templates = []
        for template in self.parsed_wiki_markup.templates:
            name = template.name.strip()
            if name == target_template_name:
                matches = True
                for key, value in match.items():
                    if not template.has_arg(key, value):
                        matches = False
                if matches:
                    matching_templates.append(template)
        return matching_templates

    @classmethod
    def _get_template_arguments(cls, template: Template) -> typing.Dict[str, str]:
        """
        Get the arguments of the given template
        Args:
            template: template to extract the arguments from

        Returns:
            dict: arguments of the template
        """
        args = dict()
        for arg in template.arguments:
            name = arg.name.strip()
            value = arg.value.strip()
            args[name] = value
        return args

    def add_template(self, template_name: str, data: dict):
        """
        Adds the given data as template with the given name to this wikifile.
        The data is added as new template object to the wikifile.
        Args:
            template_name(str): Name of the template the data should be inserted in
            data(dict): Data that should be saved in form of a template
        """
        template_markup = "{{" + template_name + "\n"
        for key, value in data.items():
            if value is not None:
                template_markup += f"|{key}={value}\n"
        template_markup += "}}"
        template = Template(template_markup)
        self.wiki_markup = f"{self.wiki_markup}\n{template}"

    def update_template(
        self,
        template_name: str,
        args: dict,
        overwrite: bool = False,
        update_all: bool = False,
        match: typing.Optional[typing.Dict[str, str]] = None,
    ):
        """
        Updates the given template the values from the given dict args.
        If force is set to True existing values will be overwritten.
        Args:
            template_name(str): name of the template that should be updated
            args(dict): Dict containing the arguments that should be set. key=argument name, value=argument value
            overwrite(bool): If True existing values will be overwritten
            update_all(bool): If True all matching attributes are updated.
                              Otherwise, only one template is updated if matched multiple an error is raised.
            match(dict): matching criteria for the template.

        Returns:
            Nothing
        """
        if match is None:
            match = {}
        matching_templates = self._get_templates_by_name(template_name, match=match)
        if matching_templates:
            if len(matching_templates) > 1 and not update_all:
                warnings.warn(
                    "More than one template were matched. Either improve the matching criteria or enable update_all",
                    UserWarning,
                )
                pass
            else:
                for template in matching_templates:
                    self._update_arguments(template, args, overwrite)
        else:
            self.add_template(template_name, args)

    @classmethod
    def _update_arguments(cls, template: Template, args: dict, overwrite: bool = False):
        """
        Updates the arguments of the given template with the values of the given dict (args)

        Args:
            template: Template that should be updated
            args(dict): Dict containing the arguments that should be set. key=argument name, value=argument value
            overwrite(bool): If True existing values will be overwritten

        Returns:
            Nothing
        """
        postfix = "\n"
        for key, value in args.items():
            if template.has_arg(key):
                # update argument
                if overwrite:
                    template.del_arg(key)
                    if value is not None:
                        template.set_arg(key, str(value) + postfix)
                else:
                    pass
            else:
                if value is not None:
                    template.set_arg(key, str(value) + postfix, preserve_spacing=False)

    def extract_template(
        self, template_name: str, match: typing.Optional[typing.Dict[str, str]] = None
    ) -> typing.List[typing.Dict[str, str]]:
        """
        Extracts the template data and returns it as dict

        Args:
            template_name: name of the template that should be extracted
            match(dict): Additional matching criteria

        Returns:
            list of dicts: records of the templates that match the given name
        """
        if match is None:
            match = {}
        templates = self._get_templates_by_name(template_name, match=match)
        lod = []
        for template in templates:
            if template is None:
                continue
            records = self._get_template_arguments(template)
            if records:
                lod.append(records)
        return lod

    def __str__(self) -> str:
        return self.wiki_markup

parsed_wiki_markup: wtp.WikiText property writable

Get WikiText. If not already parsed the markup is parsed

Returns:

Name Type Description
wtp WikiText

WikiText

wiki_markup: str property writable

Get wiki markup. If wikimarkup was parsed the parsed markup is converted back to wiki markup

Returns:

Type Description
str

str

__init__(page_title, wiki_markup=None, debug=False)

Parameters:

Name Type Description Default
page_title str

page title of the wiki_markup file

required
wiki_markup str

WikiPage content as string. If None tries to init the wiki_markup from source location

None
Source code in wikibot3rd/wikitext.py
19
20
21
22
23
24
25
26
27
28
29
def __init__(self, page_title: str, wiki_markup: str = None, debug: bool = False):
    """

    Args:
        page_title: page title of the wiki_markup file
        wiki_markup: WikiPage content as string. If None tries to init the wiki_markup from source location
    """
    self.page_title = page_title
    self.debug = debug
    self._wiki_markup = wiki_markup
    self._parsed_wiki_markup: typing.Optional[wtp.WikiText] = None

add_template(template_name, data)

Adds the given data as template with the given name to this wikifile. The data is added as new template object to the wikifile. Args: template_name(str): Name of the template the data should be inserted in data(dict): Data that should be saved in form of a template

Source code in wikibot3rd/wikitext.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def add_template(self, template_name: str, data: dict):
    """
    Adds the given data as template with the given name to this wikifile.
    The data is added as new template object to the wikifile.
    Args:
        template_name(str): Name of the template the data should be inserted in
        data(dict): Data that should be saved in form of a template
    """
    template_markup = "{{" + template_name + "\n"
    for key, value in data.items():
        if value is not None:
            template_markup += f"|{key}={value}\n"
    template_markup += "}}"
    template = Template(template_markup)
    self.wiki_markup = f"{self.wiki_markup}\n{template}"

extract_template(template_name, match=None)

Extracts the template data and returns it as dict

Parameters:

Name Type Description Default
template_name str

name of the template that should be extracted

required
match(dict)

Additional matching criteria

required

Returns:

Type Description
List[Dict[str, str]]

list of dicts: records of the templates that match the given name

Source code in wikibot3rd/wikitext.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
def extract_template(
    self, template_name: str, match: typing.Optional[typing.Dict[str, str]] = None
) -> typing.List[typing.Dict[str, str]]:
    """
    Extracts the template data and returns it as dict

    Args:
        template_name: name of the template that should be extracted
        match(dict): Additional matching criteria

    Returns:
        list of dicts: records of the templates that match the given name
    """
    if match is None:
        match = {}
    templates = self._get_templates_by_name(template_name, match=match)
    lod = []
    for template in templates:
        if template is None:
            continue
        records = self._get_template_arguments(template)
        if records:
            lod.append(records)
    return lod

update_template(template_name, args, overwrite=False, update_all=False, match=None)

Updates the given template the values from the given dict args. If force is set to True existing values will be overwritten. Args: template_name(str): name of the template that should be updated args(dict): Dict containing the arguments that should be set. key=argument name, value=argument value overwrite(bool): If True existing values will be overwritten update_all(bool): If True all matching attributes are updated. Otherwise, only one template is updated if matched multiple an error is raised. match(dict): matching criteria for the template.

Returns:

Type Description

Nothing

Source code in wikibot3rd/wikitext.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
def update_template(
    self,
    template_name: str,
    args: dict,
    overwrite: bool = False,
    update_all: bool = False,
    match: typing.Optional[typing.Dict[str, str]] = None,
):
    """
    Updates the given template the values from the given dict args.
    If force is set to True existing values will be overwritten.
    Args:
        template_name(str): name of the template that should be updated
        args(dict): Dict containing the arguments that should be set. key=argument name, value=argument value
        overwrite(bool): If True existing values will be overwritten
        update_all(bool): If True all matching attributes are updated.
                          Otherwise, only one template is updated if matched multiple an error is raised.
        match(dict): matching criteria for the template.

    Returns:
        Nothing
    """
    if match is None:
        match = {}
    matching_templates = self._get_templates_by_name(template_name, match=match)
    if matching_templates:
        if len(matching_templates) > 1 and not update_all:
            warnings.warn(
                "More than one template were matched. Either improve the matching criteria or enable update_all",
                UserWarning,
            )
            pass
        else:
            for template in matching_templates:
                self._update_arguments(template, args, overwrite)
    else:
        self.add_template(template_name, args)

WikiSON

WikiSON Api to edit WikiSON entities

Source code in wikibot3rd/wikitext.py
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
class WikiSON:
    """
    WikiSON Api to edit WikiSON entities
    """

    def __init__(self, page_title: str, wiki_markup: str):
        """
        constructor
        Args:
            page_title: name of the wiki page
            wiki_markup: markup of the wiki page
        """
        self.wiki_markup = WikiMarkup(page_title, wiki_markup)

    def get(self, entity_type_name: str) -> typing.Optional[typing.Dict[str, str]]:
        """
        Get the WikiSON entity by the given name
        Args:
            entity_type_name: name of the WikiSON entity type e.g. Scholar, Event

        Raises:
            Exception: if wiki markup contains more than one WikiSON with the same entity type name

        Returns:
            dict: record of the entity
        """
        records = self.wiki_markup.extract_template(entity_type_name)
        if len(records) > 1:
            raise Exception(
                "More than one WikiSON with the same entity type on one Page"
            )
        if len(records) == 1:
            record = records[0]
        else:
            record = None
        return record

    def set(self, entity_type_name: str, record: dict) -> str:
        """
        Set WikiSON entity with the given type and data
        Args:
            entity_type_name: name of the WikiSON entity type e.g. Scholar, Event
            record: data to add to the WikiSON entity

        Returns:
            str: wiki markup of the page
        """
        self.wiki_markup.update_template(entity_type_name, args=record, overwrite=True)
        return self.wiki_markup.wiki_markup

__init__(page_title, wiki_markup)

constructor Args: page_title: name of the wiki page wiki_markup: markup of the wiki page

Source code in wikibot3rd/wikitext.py
230
231
232
233
234
235
236
237
def __init__(self, page_title: str, wiki_markup: str):
    """
    constructor
    Args:
        page_title: name of the wiki page
        wiki_markup: markup of the wiki page
    """
    self.wiki_markup = WikiMarkup(page_title, wiki_markup)

get(entity_type_name)

Get the WikiSON entity by the given name Args: entity_type_name: name of the WikiSON entity type e.g. Scholar, Event

Raises:

Type Description
Exception

if wiki markup contains more than one WikiSON with the same entity type name

Returns:

Name Type Description
dict Optional[Dict[str, str]]

record of the entity

Source code in wikibot3rd/wikitext.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def get(self, entity_type_name: str) -> typing.Optional[typing.Dict[str, str]]:
    """
    Get the WikiSON entity by the given name
    Args:
        entity_type_name: name of the WikiSON entity type e.g. Scholar, Event

    Raises:
        Exception: if wiki markup contains more than one WikiSON with the same entity type name

    Returns:
        dict: record of the entity
    """
    records = self.wiki_markup.extract_template(entity_type_name)
    if len(records) > 1:
        raise Exception(
            "More than one WikiSON with the same entity type on one Page"
        )
    if len(records) == 1:
        record = records[0]
    else:
        record = None
    return record

set(entity_type_name, record)

Set WikiSON entity with the given type and data Args: entity_type_name: name of the WikiSON entity type e.g. Scholar, Event record: data to add to the WikiSON entity

Returns:

Name Type Description
str str

wiki markup of the page

Source code in wikibot3rd/wikitext.py
262
263
264
265
266
267
268
269
270
271
272
273
def set(self, entity_type_name: str, record: dict) -> str:
    """
    Set WikiSON entity with the given type and data
    Args:
        entity_type_name: name of the WikiSON entity type e.g. Scholar, Event
        record: data to add to the WikiSON entity

    Returns:
        str: wiki markup of the page
    """
    self.wiki_markup.update_template(entity_type_name, args=record, overwrite=True)
    return self.wiki_markup.wiki_markup

wikiupload

Created on 2020-11-12

@author: wf

wikiuser

Created on 2020-11-01

@author: wf

WikiUser

Bases: object

User credentials for a specific wiki

Source code in wikibot3rd/wikiuser.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
class WikiUser(object):
    """
    User credentials for a specific wiki
    """

    def __init__(self):
        """
        construct me
        """
        # set None values for all fields
        for field in WikiUser.getFields():
            setattr(self, field, None)

    def get_password(self):
        password = self.getPassword()
        return password

    def getPassword(self) -> str:
        """
        get my decrypted password

        Returns:
            str: the decrypted password for this user
        """
        c = Crypt(self.cypher, 20, self.salt)
        return c.decrypt(self.secret)

    def get_wiki_url(self):
        wiki_url = self.getWikiUrl()
        return wiki_url

    def getWikiUrl(self):
        """
        return the full url of this wiki

        Returns:
            str: the full url of this wiki
        """
        url = f"{self.url}{self.scriptPath}"
        return url

    def interactiveSave(
        self, yes: bool = False, interactive: bool = False, filePath=None
    ):
        """
        save me

        Args:
            yes (bool): if True save without asking
            interactive (bool): if True get interactive input
            filePath (str): the path where to save the credentials ini file
        """
        fields = WikiUser.getFields(encrypted=False)
        text = ""
        for field in fields:
            if hasattr(self, field):
                value = getattr(self, field)
            else:
                value = None
            if interactive:
                print(text)
                inputMsg = f"{field} ({value}): "
                inputValue = input(inputMsg)
                if inputValue:
                    setattr(self, field, inputValue)
                    value = inputValue
            text += f"\n  {field}={value}"
        # encrypt
        self.encrypt()
        if not yes:
            answer = input(
                f"shall i store credentials for {text}\nto an ini file? yes/no y/n"
            )
            yes = "y" in answer or "yes" in answer
        if yes:
            self.save(filePath)

    def encrypt(self, remove: bool = True):
        """
        encrypt my clear text password

        Args:
            remove (bool): if True remove the original password
        """
        crypt = Crypt.getRandomCrypt()
        self.secret = crypt.encrypt(self.password)
        self.cypher = crypt.cypher.decode()
        self.salt = crypt.salt.decode()
        if remove:
            delattr(self, "password")

    def __str__(self):
        text = f"{self.user} {self.wikiId}"
        return text

    @staticmethod
    def getIniPath():
        home = str(Path.home())
        path = f"{home}/.mediawiki-japi"
        return path

    @staticmethod
    def iniFilePath(wikiId: str):
        user = getpass.getuser()
        iniPath = WikiUser.getIniPath()
        iniFilePath = f"{iniPath}/{user}_{wikiId}.ini"
        return iniFilePath

    @staticmethod
    def ofWikiId(wikiId: str, lenient=False) -> "WikiUser":
        """
        create a wikiUser for the given wikiId

        Args:
            wikiId (str): the wikiId of the user to be created
            lenient (bool): if True ignore parsing errors in the ini file

        Returns:
            WikiUser: the wikiUser for this wikiId
        """
        path = WikiUser.iniFilePath(wikiId)
        try:
            config = WikiUser.readPropertyFile(path)
        except FileNotFoundError as _e:
            errMsg = f'the wiki with the wikiID "{wikiId}" does not have a corresponding configuration file ... you might want to create one with the wikiuser command'
            raise FileNotFoundError(errMsg)
        wikiUser = WikiUser.ofDict(config, lenient=lenient)
        return wikiUser

    def save(self, iniFilePath=None):
        """
        save me to a propertyFile
        """
        if iniFilePath is None:
            iniPath = WikiUser.getIniPath()
            if not isdir(iniPath):
                makedirs(iniPath)
            iniFilePath = WikiUser.iniFilePath(self.wikiId)

        iniFile = open(iniFilePath, "w")
        isodate = datetime.datetime.now().isoformat()
        template = """# Mediawiki JAPI credentials for %s
# created by py-3rdparty-mediawiki WikiUser at %s
"""
        content = template % (self.wikiId, isodate)
        for field in WikiUser.getFields():
            value = self.__dict__[field]
            if value is not None:
                content += "%s=%s\n" % (field, value)

        iniFile.write(content)
        iniFile.close()

    @staticmethod
    def readPropertyFile(filepath, sep="=", comment_char="#") -> Dict[str]:
        """
        Read the file passed as parameter as a properties file.
        https://stackoverflow.com/a/31852401/1497139
        """
        props = {}
        with open(filepath, "rt") as f:
            for line in f:
                l = line.strip()
                if l and not l.startswith(comment_char):
                    key_value = l.split(sep)
                    key = key_value[0].strip()
                    value = sep.join(key_value[1:]).strip().strip('"')
                    props[key] = value
        return props

    @staticmethod
    def getWikiUsers(lenient: bool = False):
        wikiUsers = {}
        iniPath = WikiUser.getIniPath()
        if os.path.isdir(iniPath):
            with os.scandir(iniPath) as it:
                for entry in it:
                    if entry.name.endswith(".ini") and entry.is_file():
                        try:
                            config = WikiUser.readPropertyFile(entry.path)
                            wikiUser = WikiUser.ofDict(config, lenient=lenient)
                            wikiUsers[wikiUser.wikiId] = wikiUser
                        except Exception as ex:
                            print("error in %s: %s" % (entry.path, str(ex)))
        return wikiUsers

    @staticmethod
    def getFields(encrypted=True):
        # copy fields
        fields = ["wikiId", "url", "scriptPath", "version", "user", "email"]
        passwordFields = ["cypher", "secret", "salt"] if encrypted else ["password"]
        result = []
        result.extend(fields)
        result.extend(passwordFields)
        return result

    @staticmethod
    def ofDict(userDict, encrypted=True, lenient=False, encrypt=True):
        wikiUser = WikiUser()
        # fix http\: entries from Java created entries
        if "url" in userDict and userDict["url"] is not None:
            userDict["url"] = userDict["url"].replace("\:", ":")

        for field in WikiUser.getFields(encrypted):
            if field in userDict:
                wikiUser.__dict__[field] = userDict[field]
            else:
                if not lenient:
                    raise Exception(f"{field} missing")
        if not encrypted and encrypt:
            wikiUser.encrypt()
        return wikiUser

__init__()

construct me

Source code in wikibot3rd/wikiuser.py
27
28
29
30
31
32
33
def __init__(self):
    """
    construct me
    """
    # set None values for all fields
    for field in WikiUser.getFields():
        setattr(self, field, None)

encrypt(remove=True)

encrypt my clear text password

Parameters:

Name Type Description Default
remove bool

if True remove the original password

True
Source code in wikibot3rd/wikiuser.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
def encrypt(self, remove: bool = True):
    """
    encrypt my clear text password

    Args:
        remove (bool): if True remove the original password
    """
    crypt = Crypt.getRandomCrypt()
    self.secret = crypt.encrypt(self.password)
    self.cypher = crypt.cypher.decode()
    self.salt = crypt.salt.decode()
    if remove:
        delattr(self, "password")

getPassword()

get my decrypted password

Returns:

Name Type Description
str str

the decrypted password for this user

Source code in wikibot3rd/wikiuser.py
39
40
41
42
43
44
45
46
47
def getPassword(self) -> str:
    """
    get my decrypted password

    Returns:
        str: the decrypted password for this user
    """
    c = Crypt(self.cypher, 20, self.salt)
    return c.decrypt(self.secret)

getWikiUrl()

return the full url of this wiki

Returns:

Name Type Description
str

the full url of this wiki

Source code in wikibot3rd/wikiuser.py
53
54
55
56
57
58
59
60
61
def getWikiUrl(self):
    """
    return the full url of this wiki

    Returns:
        str: the full url of this wiki
    """
    url = f"{self.url}{self.scriptPath}"
    return url

interactiveSave(yes=False, interactive=False, filePath=None)

save me

Parameters:

Name Type Description Default
yes bool

if True save without asking

False
interactive bool

if True get interactive input

False
filePath str

the path where to save the credentials ini file

None
Source code in wikibot3rd/wikiuser.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def interactiveSave(
    self, yes: bool = False, interactive: bool = False, filePath=None
):
    """
    save me

    Args:
        yes (bool): if True save without asking
        interactive (bool): if True get interactive input
        filePath (str): the path where to save the credentials ini file
    """
    fields = WikiUser.getFields(encrypted=False)
    text = ""
    for field in fields:
        if hasattr(self, field):
            value = getattr(self, field)
        else:
            value = None
        if interactive:
            print(text)
            inputMsg = f"{field} ({value}): "
            inputValue = input(inputMsg)
            if inputValue:
                setattr(self, field, inputValue)
                value = inputValue
        text += f"\n  {field}={value}"
    # encrypt
    self.encrypt()
    if not yes:
        answer = input(
            f"shall i store credentials for {text}\nto an ini file? yes/no y/n"
        )
        yes = "y" in answer or "yes" in answer
    if yes:
        self.save(filePath)

ofWikiId(wikiId, lenient=False) staticmethod

create a wikiUser for the given wikiId

Parameters:

Name Type Description Default
wikiId str

the wikiId of the user to be created

required
lenient bool

if True ignore parsing errors in the ini file

False

Returns:

Name Type Description
WikiUser WikiUser

the wikiUser for this wikiId

Source code in wikibot3rd/wikiuser.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
@staticmethod
def ofWikiId(wikiId: str, lenient=False) -> "WikiUser":
    """
    create a wikiUser for the given wikiId

    Args:
        wikiId (str): the wikiId of the user to be created
        lenient (bool): if True ignore parsing errors in the ini file

    Returns:
        WikiUser: the wikiUser for this wikiId
    """
    path = WikiUser.iniFilePath(wikiId)
    try:
        config = WikiUser.readPropertyFile(path)
    except FileNotFoundError as _e:
        errMsg = f'the wiki with the wikiID "{wikiId}" does not have a corresponding configuration file ... you might want to create one with the wikiuser command'
        raise FileNotFoundError(errMsg)
    wikiUser = WikiUser.ofDict(config, lenient=lenient)
    return wikiUser

readPropertyFile(filepath, sep='=', comment_char='#') staticmethod

Read the file passed as parameter as a properties file. https://stackoverflow.com/a/31852401/1497139

Source code in wikibot3rd/wikiuser.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
@staticmethod
def readPropertyFile(filepath, sep="=", comment_char="#") -> Dict[str]:
    """
    Read the file passed as parameter as a properties file.
    https://stackoverflow.com/a/31852401/1497139
    """
    props = {}
    with open(filepath, "rt") as f:
        for line in f:
            l = line.strip()
            if l and not l.startswith(comment_char):
                key_value = l.split(sep)
                key = key_value[0].strip()
                value = sep.join(key_value[1:]).strip().strip('"')
                props[key] = value
    return props

save(iniFilePath=None)

save me to a propertyFile

Source code in wikibot3rd/wikiuser.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
    def save(self, iniFilePath=None):
        """
        save me to a propertyFile
        """
        if iniFilePath is None:
            iniPath = WikiUser.getIniPath()
            if not isdir(iniPath):
                makedirs(iniPath)
            iniFilePath = WikiUser.iniFilePath(self.wikiId)

        iniFile = open(iniFilePath, "w")
        isodate = datetime.datetime.now().isoformat()
        template = """# Mediawiki JAPI credentials for %s
# created by py-3rdparty-mediawiki WikiUser at %s
"""
        content = template % (self.wikiId, isodate)
        for field in WikiUser.getFields():
            value = self.__dict__[field]
            if value is not None:
                content += "%s=%s\n" % (field, value)

        iniFile.write(content)
        iniFile.close()

main(argv=None)

WikiUser credential handling

Source code in wikibot3rd/wikiuser.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
def main(argv=None):  # IGNORE:C0111
    """
    WikiUser credential handling
    """

    if argv is None:
        argv = sys.argv[1:]

    program_name = os.path.basename(sys.argv[0])
    program_version = "v%s" % __version__
    program_build_date = str(__updated__)
    program_version_message = "%%(prog)s %s (%s)" % (
        program_version,
        program_build_date,
    )
    program_shortdesc = "WikiUser credential handling"
    user_name = "Wolfgang Fahl"

    program_license = """%s

  Created by %s on %s.
  Copyright 2020-2024 Wolfgang Fahl. All rights reserved.

  Licensed under the Apache License 2.0
  http://www.apache.org/licenses/LICENSE-2.0

  Distributed on an "AS IS" basis without warranties
  or conditions of any kind, either express or implied.

USAGE
""" % (
        program_shortdesc,
        user_name,
        str(__date__),
    )

    try:
        # Setup argument parser
        parser = ArgumentParser(
            description=program_license, formatter_class=RawDescriptionHelpFormatter
        )
        parser.add_argument(
            "-d",
            "--debug",
            dest="debug",
            action="count",
            help="set debug level [default: %(default)s]",
        )
        parser.add_argument(
            "-V", "--version", action="version", version=program_version_message
        )
        parser.add_argument("-i", "--interactive", action="store_true")
        parser.add_argument("-e", "--email", dest="email", help="email of the user")
        parser.add_argument("-f", "--file", dest="filePath", help="ini-file path")
        parser.add_argument("-l", "--url", dest="url", help="url of the wiki")
        parser.add_argument(
            "-s",
            "--scriptPath",
            dest="scriptPath",
            help="script path default: %(default)s)",
            default="",
        )
        parser.add_argument("-p", "--password", dest="password", help="password")
        parser.add_argument(
            "-u",
            "--user",
            dest="user",
            help="os user id default: %(default)s)",
            default=getpass.getuser(),
        )
        parser.add_argument(
            "-v",
            "--wikiVersion",
            dest="version",
            default="MediaWiki 1.39.1",
            help="version of the wiki default: %(default)s)",
        )
        parser.add_argument("-w", "--wikiId", dest="wikiId", help="wiki Id")
        parser.add_argument(
            "-y",
            "--yes",
            dest="yes",
            action="store_true",
            help="immediately store without asking",
        )
        # Process arguments
        args = parser.parse_args(argv)
        argsDict = vars(args)
        wikiuser = WikiUser.ofDict(
            argsDict, encrypted=False, lenient=True, encrypt=False
        )
        wikiuser.interactiveSave(args.yes, args.interactive, args.filePath)

    except KeyboardInterrupt:
        ### handle keyboard interrupt ###
        return 1
    except Exception as e:
        if DEBUG:
            raise (e)
        indent = len(program_name) * " "
        sys.stderr.write(program_name + ": " + repr(e) + "\n")
        sys.stderr.write(indent + "  for help use --help")
        if args.debug:
            print(traceback.format_exc())
        return 2