Skip to content

py_ez_wikidata API Documentation

prefixes

Created on 2024-03-02

@author: wf

Prefixes

handle standard Prefixes

Source code in ez_wikidata/prefixes.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class Prefixes:
    """
    handle standard Prefixes
    """

    @classmethod
    def getPrefixes(
        cls, prefixes=["rdf", "rdfs", "schema", "wd", "wdt", "wikibase", "xsd"]
    ):
        prefixMap = {
            "bd": "<http://www.bigdata.com/rdf#>",
            "cc": "<http://creativecommons.org/ns#>",
            "dct": "<http://purl.org/dc/terms/>",
            "geo": "<http://www.opengis.net/ont/geosparql#>",
            "ontolex": "<http://www.w3.org/ns/lemon/ontolex#>",
            "owl": "<http://www.w3.org/2002/07/owl#>",
            "p": "<http://www.wikidata.org/prop/>",
            "pq": "<http://www.wikidata.org/prop/qualifier/>",
            "pqn": "<http://www.wikidata.org/prop/qualifier/value-normalized/>",
            "pqv": "<http://www.wikidata.org/prop/qualifier/value/>",
            "pr": "<http://www.wikidata.org/prop/reference/>",
            "prn": "<http://www.wikidata.org/prop/reference/value-normalized/>",
            "prov": "<http://www.w3.org/ns/prov#>",
            "prv": "<http://www.wikidata.org/prop/reference/value/>",
            "ps": "<http://www.wikidata.org/prop/statement/>",
            "psn": "<http://www.wikidata.org/prop/statement/value-normalized/>",
            "psv": "<http://www.wikidata.org/prop/statement/value/>",
            "rdf": "<http://www.w3.org/1999/02/22-rdf-syntax-ns#>",
            "rdfs": "<http://www.w3.org/2000/01/rdf-schema#>",
            "schema": "<http://schema.org/>",
            "skos": "<http://www.w3.org/2004/02/skos/core#>",
            "wd": "<http://www.wikidata.org/entity/>",
            "wdata": "<http://www.wikidata.org/wiki/Special:EntityData/>",
            "wdno": "<http://www.wikidata.org/prop/novalue/>",
            "wdref": "<http://www.wikidata.org/reference/>",
            "wds": "<http://www.wikidata.org/entity/statement/>",
            "wdt": "<http://www.wikidata.org/prop/direct/>",
            "wdtn": "<http://www.wikidata.org/prop/direct-normalized/>",
            "wdv": "<http://www.wikidata.org/value/>",
            "wikibase": "<http://wikiba.se/ontology#>",
            "xsd": "<http://www.w3.org/2001/XMLSchema#>",
        }
        # see also https://www.wikidata.org/wiki/EntitySchema:E49
        sparql = ""
        for prefix in prefixes:
            if prefix in prefixMap:
                sparql += f"PREFIX {prefix}: {prefixMap[prefix]}\n"
        return sparql

trulytabular

Created on 2022-04-14

@author: wf

TrulyTabular

Bases: object

truly tabular SPARQL/RDF analysis

checks "how tabular" a query based on a list of properties of an itemclass is

Source code in ez_wikidata/trulytabular.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
class TrulyTabular(object):
    """
    truly tabular SPARQL/RDF analysis

    checks "how tabular" a query based on a list of properties of an itemclass is
    """

    def __init__(
        self,
        itemQid,
        propertyLabels: list = [],
        propertyIds: list = [],
        search_predicate="wdt:P31",
        where: str = None,
        endpointConf=None,
        lang="en",
        debug=False,
    ):
        """
        Constructor

        Args:
            itemQid(str): wikidata id of the type to analyze
            propertyLabels(list): a list of labels of properties to be considered
            propertyIds(list): a list of ids of properties to be considered
            search_predicate(str): the search predicate to use e.g. instanceof / subclass of
            where(str): extra where clause for instance selection (if any)
            endpoint(str): the url of the SPARQL endpoint to be used
        """
        self.itemQid = itemQid
        self.debug = debug
        if endpointConf is None:
            endpointConf = Endpoint.getDefault()
        self.endpointConf = endpointConf
        self.wpm = WikidataPropertyManager.get_instance(
            endpoint_url=endpointConf.endpoint
        )
        self.sparql = SPARQL(endpointConf.endpoint, method=self.endpointConf.method)
        self.sparql.debug = self.debug
        self.search_predicate = search_predicate
        self.where = f"\n  {where}" if where is not None else ""
        self.lang = lang
        self.item = WikidataItem(
            itemQid, sparql=self.sparql, lang=lang, debug=self.debug
        )
        self.queryManager = TrulyTabular.getQueryManager(debug=self.debug)
        self.properties = self.wpm.get_properties_by_ids(propertyIds)
        self.properties.update(
            self.wpm.get_properties_by_labels(propertyLabels, lang=lang)
        )
        self.isodate = datetime.datetime.now().isoformat()
        self.error = None

    def __str__(self):
        """
        Returns:
            str: my text representation
        """
        return self.asText(long=False)

    def count(self):
        """
        get my count
        """
        itemText = self.getItemText()
        query = f"""# Count all items with the given type
# {itemText}
{Prefixes.getPrefixes()}
SELECT (COUNT (DISTINCT ?item) AS ?count)
WHERE
{{
  # instance of {self.item.qlabel}
  ?item {self.search_predicate} wd:{self.item.qid}.{self.where}
}}"""
        try:
            count = self.sparql.getValue(query, "count")
            # workaround https://github.com/ad-freiburg/qlever/issues/717
            count = int(count)
        except Exception as ex:
            self.error = ex
            count = None

        return count, query

    def asText(self, long: bool = True):
        """
        returns my content as a text representation

        Args:
            long(bool): True if a long format including url is wished

        Returns:
            str: a text representation of my content
        """
        text = self.item.asText(long)
        return text

    def getItemText(self):
        # leads to 405 Method not allowed in SPARQLWrapper under certain circumstances
        # itemText=self.asText(long=True)
        itemText = f"{self.itemQid}:{self.item.qlabel}"
        return itemText

    @classmethod
    def getQueryManager(cls, lang="sparql", name="trulytabular", debug=False):
        """
        get the query manager for the given language and fileName

        Args:
            lang(str): the language of the queries to extract
            name(str): the name of the manager containing the query specifications
            debug(bool): if True set debugging on
        """
        qYamlFileName = f"{name}.yaml"
        for qYamlFile in YamlPath.getPaths(qYamlFileName):
            if os.path.isfile(qYamlFile):
                qm = QueryManager(lang=lang, debug=debug, queriesPath=qYamlFile)
                return qm
        return None

    def generateSparqlQuery(
        self,
        genMap: dict,
        listSeparator: str = "⇹",
        naive: bool = True,
        lang: str = "en",
    ) -> str:
        """
        generate a SPARQL Query

        Args:
            genMap(dict): a dictionary of generation items aggregates/ignores/labels
            listSeparator(str): the symbole to use as a list separator for GROUP_CONCAT
            naive(bool): if True - generate a naive straight forward SPARQL query
                if False generate a proper truly tabular aggregate query
            lang(str): the language to generate for

        Returns:
            str: the generated SPARQL Query
        """
        # The Wikidata item to generate the query for
        item = self.item
        # the name of this script
        script = Path(__file__).name
        # the mode of generation
        naiveText = "naive" if naive else "aggregate"
        # start with th preamble and PREFIX section
        # select the item and it's label
        sparqlQuery = f"""# truly tabular {naiveText} query for 
# {item.qid}:{item.qlabel}
# generated by {script} version {Version.version} on {self.isodate}
{Prefixes.getPrefixes()}
SELECT ?{item.itemVarname} ?{item.labelVarname}"""
        # loop over all properties
        for wdProp in self.properties.values():
            if naive:
                sparqlQuery += f"\n  ?{wdProp.valueVarname}"
            else:
                if wdProp.pid in genMap:
                    genList = genMap[wdProp.pid]
                    for aggregate in genList:
                        if not aggregate in ["ignore", "label"]:
                            distinct = ""
                            if aggregate == "list":
                                aggregateFunc = "GROUP_CONCAT"
                                aggregateParam = f';SEPARATOR="{listSeparator}"'
                                distinct = "DISTINCT "
                            else:
                                if aggregate == "count":
                                    distinct = "DISTINCT "
                                aggregateFunc = aggregate.upper()
                                aggregateParam = ""
                            sparqlQuery += f"\n  ({aggregateFunc} ({distinct}?{wdProp.valueVarname}{aggregateParam}) AS ?{wdProp.valueVarname}_{aggregate})"
                        elif aggregate == "label":
                            sparqlQuery += f"\n  ?{wdProp.labelVarname}"
                        elif aggregate == "ignore" and not "label" in genList:
                            sparqlQuery += f"\n  ?{wdProp.valueVarname}"
        sparqlQuery += f"""
WHERE {{
  # instanceof {item.qid}:{item.qlabel}
  ?{item.itemVarname} {self.search_predicate} wd:{item.qid}.
  # label
  ?{item.itemVarname} rdfs:label ?{item.labelVarname}.  
  FILTER (LANG(?{item.labelVarname}) = "{lang}").
"""
        for wdProp in self.properties.values():
            sparqlQuery += f"""  # {wdProp}
  OPTIONAL {{ 
    ?{item.itemVarname} wdt:{wdProp.pid} ?{wdProp.valueVarname}. """
            if wdProp.pid in genMap:
                genList = genMap[wdProp.pid]
                if "label" in genList:
                    sparqlQuery += f"""\n    ?{wdProp.valueVarname} rdfs:label ?{wdProp.labelVarname}."""
                    sparqlQuery += (
                        f"""\n    FILTER (LANG(?{wdProp.labelVarname}) = "{lang}")."""
                    )
            sparqlQuery += "\n  }\n"
        # close where Clause
        sparqlQuery += """}\n"""
        # optionally add Aggregate
        if not naive:
            sparqlQuery += f"""GROUP BY
  ?{item.itemVarname} 
  ?{item.labelVarname}
"""
            for wdProp in self.properties.values():
                if wdProp.pid in genMap:
                    genList = genMap[wdProp.pid]
                    if "label" in genList:
                        sparqlQuery += f"\n  ?{wdProp.labelVarname}"
                    if "ignore" in genList and not "label" in genList:
                        sparqlQuery += f"\n  ?{wdProp.valueVarname}"
            havingCount = 0
            havingDelim = "   "
            for wdProp in self.properties.values():
                if wdProp.pid in genMap:
                    genList = genMap[wdProp.pid]
                    if "ignore" in genList:
                        havingCount += 1
                        if havingCount == 1:
                            sparqlQuery += f"\nHAVING ("

                        sparqlQuery += (
                            f"\n  {havingDelim}COUNT(?{wdProp.valueVarname})<=1"
                        )
                        havingDelim = "&& "
            if havingCount > 0:
                sparqlQuery += f"\n)"
        return sparqlQuery

    def mostFrequentPropertiesQuery(self, whereClause: str = None, minCount: int = 0):
        """
        get the most frequently used properties

        Args:
            whereClause(str): an extra WhereClause to use
        """
        if whereClause is None:
            whereClause = f"?item {self.search_predicate} wd:{self.itemQid}"
            if self.endpointConf.database != "qlever":
                whereClause += ";?p ?id"
        whereClause += "."
        minCountFilter = ""
        if minCount > 0:
            minCountFilter = f"\n  FILTER(?count >{minCount})."
        itemText = self.getItemText()
        sparqlQuery = f"""# get the most frequently used properties for
# {itemText}
{Prefixes.getPrefixes()}
SELECT ?prop ?propLabel ?wbType ?count WHERE {{
  {{"""
        if self.endpointConf.database == "qlever":
            sparqlQuery += f"""
    SELECT ?p (COUNT(DISTINCT ?item) AS ?count) WHERE {{"""
        else:
            sparqlQuery += f"""
    SELECT ?prop (COUNT(DISTINCT ?item) AS ?count) WHERE {{"""
        if self.endpointConf.database == "blazegraph":
            sparqlQuery += f"""
      hint:Query hint:optimizer "None"."""
        sparqlQuery += f"""
      {whereClause}"""
        if self.endpointConf.database == "qlever":
            sparqlQuery += f"""  
      ?item ql:has-predicate ?p 
    }} GROUP BY ?p
  }}
  ?prop wikibase:directClaim ?p."""
        else:
            sparqlQuery += f"""
      ?prop wikibase:directClaim ?p.
    }}
    GROUP BY ?prop ?propLabel
  }}"""
        sparqlQuery += f"""
  ?prop rdfs:label ?propLabel.
  ?prop wikibase:propertyType ?wbType.
  FILTER(LANG(?propLabel) = "{self.lang}").{minCountFilter}  
}}
ORDER BY DESC (?count)
"""
        title = f"most frequently used properties for {self.item.asText(long=True)}"
        query = Query(
            name=f"mostFrequentProperties for {itemText}",
            query=sparqlQuery,
            title=title,
        )
        return query

    def noneTabularQuery(self, wdProperty: WikidataProperty, asFrequency: bool = True):
        """
        get the none tabular entries for the given property

        Args:
            wdProperty(WikidataProperty): the property to analyze
            asFrequency(bool): if true do a frequency analysis
        """
        propertyLabel = wdProperty.plabel
        propertyId = wdProperty.pid
        # work around https://github.com/RDFLib/sparqlwrapper/issues/211
        if "described at" in propertyLabel:
            propertyLabel = propertyLabel.replace("described at", "describ'd at")
        sparql = f"""SELECT ?item ?itemLabel (COUNT (?value) AS ?count)
WHERE
{{
  # instance of {self.item.qlabel}
  ?item {self.search_predicate} wd:{self.itemQid}.{self.where}
  ?item rdfs:label ?itemLabel.
  FILTER (LANG(?itemLabel) = "{self.lang}").
  # {propertyLabel}
  ?item {wdProperty.getPredicate()} ?value.
}} GROUP BY ?item ?itemLabel
"""
        if asFrequency:
            freqDesc = "frequencies"
            sparql = f"""SELECT ?count (COUNT(?count) AS ?frequency) WHERE {{{{
{sparql}
}}}}
GROUP BY ?count
ORDER BY DESC (?frequency)"""
        else:
            freqDesc = "records"
            sparql = f"""{sparql}
HAVING (COUNT (?value) > 1)
ORDER BY DESC(?count)"""
        itemText = self.getItemText()
        sparql = (
            f"""# Count all {itemText} items
# with the given {propertyLabel}({propertyId}) https://www.wikidata.org/wiki/Property:{propertyId} 
{Prefixes.getPrefixes()}
"""
            + sparql
        )
        title = f"non tabular entries for {self.item.qlabel}/{propertyLabel}:{freqDesc}"
        name = f"NonTabular {self.item.qlabel}/{propertyLabel}:{freqDesc}"
        query = Query(query=sparql, name=name, title=title)
        return query

    def noneTabular(self, wdProperty: WikidataProperty):
        """
        get the none tabular result for the given Wikidata property

        Args:
            wdProperty(WikidataProperty): the Wikidata property
        """
        query = self.noneTabularQuery(wdProperty)
        if self.debug:
            logging.info(query.query)
        qlod = self.sparql.queryAsListOfDicts(query.query)
        return qlod

    def addStatsColWithPercent(
        self, m: dict, col: str, value: Union[int, float], total: Union[int, float]
    ):
        """
        add a statistics Column
        Args:
            m(dict):
            col(str): name of the column
            value: value
            total: total value
        """
        m[col] = value
        if total is not None and total > 0:
            m[f"{col}%"] = float(f"{value/total*100:.1f}")
        else:
            m[f"{col}%"] = None

    def genWdPropertyStatistic(
        self, wdProperty: WikidataProperty, itemCount: int, withQuery=True
    ) -> dict:
        """
        generate a property Statistics Row for the given wikidata Property

        Args:
            wdProperty(WikidataProperty): the property to get the statistics for
            itemCount(int): the total number of items to check
            withQuery(bool): if true include the sparql query

        Returns:
            dict: a statistics row
        """
        ntlod = self.noneTabular(wdProperty)
        statsRow = {"property": wdProperty.plabel}
        total = 0
        nttotal = 0
        maxCount = 0
        for record in ntlod:
            f = int(record["frequency"])
            count = int(record["count"])
            # statsRow[f"f{count}"]=f
            if count > 1:
                nttotal += f
            else:
                statsRow["1"] = f
            if count > maxCount:
                maxCount = count
            total += f
        statsRow["maxf"] = maxCount
        if withQuery:
            statsRow["queryf"] = self.noneTabularQuery(wdProperty).query
            statsRow["queryex"] = self.noneTabularQuery(
                wdProperty, asFrequency=False
            ).query
        self.addStatsColWithPercent(statsRow, "total", total, itemCount)
        self.addStatsColWithPercent(statsRow, "non tabular", nttotal, total)
        return statsRow

    def genPropertyStatistics(self):
        """
        generate the property Statistics

        Returns:
            generator: a generator of statistic dict rows
        """
        itemCount, _itemCountQuery = self.count()
        for wdProperty in self.properties.values():
            statsRow = self.genWdPropertyStatistic(wdProperty, itemCount)
            yield statsRow

    def getPropertyStatistics(self):
        """
        get the property Statistics
        """
        itemCount, _itemCountQuery = self.count()
        lod = [{"property": "∑", "total": itemCount, "total%": 100.0}]
        for wdProperty in self.properties.values():
            statsRow = self.genWdPropertyStatistic(wdProperty, itemCount)
            lod.append(statsRow)
        return lod

__init__(itemQid, propertyLabels=[], propertyIds=[], search_predicate='wdt:P31', where=None, endpointConf=None, lang='en', debug=False)

Constructor

Parameters:

Name Type Description Default
itemQid(str)

wikidata id of the type to analyze

required
propertyLabels(list)

a list of labels of properties to be considered

required
propertyIds(list)

a list of ids of properties to be considered

required
search_predicate(str)

the search predicate to use e.g. instanceof / subclass of

required
where(str)

extra where clause for instance selection (if any)

required
endpoint(str)

the url of the SPARQL endpoint to be used

required
Source code in ez_wikidata/trulytabular.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def __init__(
    self,
    itemQid,
    propertyLabels: list = [],
    propertyIds: list = [],
    search_predicate="wdt:P31",
    where: str = None,
    endpointConf=None,
    lang="en",
    debug=False,
):
    """
    Constructor

    Args:
        itemQid(str): wikidata id of the type to analyze
        propertyLabels(list): a list of labels of properties to be considered
        propertyIds(list): a list of ids of properties to be considered
        search_predicate(str): the search predicate to use e.g. instanceof / subclass of
        where(str): extra where clause for instance selection (if any)
        endpoint(str): the url of the SPARQL endpoint to be used
    """
    self.itemQid = itemQid
    self.debug = debug
    if endpointConf is None:
        endpointConf = Endpoint.getDefault()
    self.endpointConf = endpointConf
    self.wpm = WikidataPropertyManager.get_instance(
        endpoint_url=endpointConf.endpoint
    )
    self.sparql = SPARQL(endpointConf.endpoint, method=self.endpointConf.method)
    self.sparql.debug = self.debug
    self.search_predicate = search_predicate
    self.where = f"\n  {where}" if where is not None else ""
    self.lang = lang
    self.item = WikidataItem(
        itemQid, sparql=self.sparql, lang=lang, debug=self.debug
    )
    self.queryManager = TrulyTabular.getQueryManager(debug=self.debug)
    self.properties = self.wpm.get_properties_by_ids(propertyIds)
    self.properties.update(
        self.wpm.get_properties_by_labels(propertyLabels, lang=lang)
    )
    self.isodate = datetime.datetime.now().isoformat()
    self.error = None

__str__()

Returns:

Name Type Description
str

my text representation

Source code in ez_wikidata/trulytabular.py
74
75
76
77
78
79
def __str__(self):
    """
    Returns:
        str: my text representation
    """
    return self.asText(long=False)

addStatsColWithPercent(m, col, value, total)

add a statistics Column Args: m(dict): col(str): name of the column value: value total: total value

Source code in ez_wikidata/trulytabular.py
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
def addStatsColWithPercent(
    self, m: dict, col: str, value: Union[int, float], total: Union[int, float]
):
    """
    add a statistics Column
    Args:
        m(dict):
        col(str): name of the column
        value: value
        total: total value
    """
    m[col] = value
    if total is not None and total > 0:
        m[f"{col}%"] = float(f"{value/total*100:.1f}")
    else:
        m[f"{col}%"] = None

asText(long=True)

returns my content as a text representation

Parameters:

Name Type Description Default
long(bool)

True if a long format including url is wished

required

Returns:

Name Type Description
str

a text representation of my content

Source code in ez_wikidata/trulytabular.py
105
106
107
108
109
110
111
112
113
114
115
116
def asText(self, long: bool = True):
    """
    returns my content as a text representation

    Args:
        long(bool): True if a long format including url is wished

    Returns:
        str: a text representation of my content
    """
    text = self.item.asText(long)
    return text

count()

get my count

Source code in ez_wikidata/trulytabular.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
    def count(self):
        """
        get my count
        """
        itemText = self.getItemText()
        query = f"""# Count all items with the given type
# {itemText}
{Prefixes.getPrefixes()}
SELECT (COUNT (DISTINCT ?item) AS ?count)
WHERE
{{
  # instance of {self.item.qlabel}
  ?item {self.search_predicate} wd:{self.item.qid}.{self.where}
}}"""
        try:
            count = self.sparql.getValue(query, "count")
            # workaround https://github.com/ad-freiburg/qlever/issues/717
            count = int(count)
        except Exception as ex:
            self.error = ex
            count = None

        return count, query

genPropertyStatistics()

generate the property Statistics

Returns:

Name Type Description
generator

a generator of statistic dict rows

Source code in ez_wikidata/trulytabular.py
429
430
431
432
433
434
435
436
437
438
439
def genPropertyStatistics(self):
    """
    generate the property Statistics

    Returns:
        generator: a generator of statistic dict rows
    """
    itemCount, _itemCountQuery = self.count()
    for wdProperty in self.properties.values():
        statsRow = self.genWdPropertyStatistic(wdProperty, itemCount)
        yield statsRow

genWdPropertyStatistic(wdProperty, itemCount, withQuery=True)

generate a property Statistics Row for the given wikidata Property

Parameters:

Name Type Description Default
wdProperty(WikidataProperty)

the property to get the statistics for

required
itemCount(int)

the total number of items to check

required
withQuery(bool)

if true include the sparql query

required

Returns:

Name Type Description
dict dict

a statistics row

Source code in ez_wikidata/trulytabular.py
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
def genWdPropertyStatistic(
    self, wdProperty: WikidataProperty, itemCount: int, withQuery=True
) -> dict:
    """
    generate a property Statistics Row for the given wikidata Property

    Args:
        wdProperty(WikidataProperty): the property to get the statistics for
        itemCount(int): the total number of items to check
        withQuery(bool): if true include the sparql query

    Returns:
        dict: a statistics row
    """
    ntlod = self.noneTabular(wdProperty)
    statsRow = {"property": wdProperty.plabel}
    total = 0
    nttotal = 0
    maxCount = 0
    for record in ntlod:
        f = int(record["frequency"])
        count = int(record["count"])
        # statsRow[f"f{count}"]=f
        if count > 1:
            nttotal += f
        else:
            statsRow["1"] = f
        if count > maxCount:
            maxCount = count
        total += f
    statsRow["maxf"] = maxCount
    if withQuery:
        statsRow["queryf"] = self.noneTabularQuery(wdProperty).query
        statsRow["queryex"] = self.noneTabularQuery(
            wdProperty, asFrequency=False
        ).query
    self.addStatsColWithPercent(statsRow, "total", total, itemCount)
    self.addStatsColWithPercent(statsRow, "non tabular", nttotal, total)
    return statsRow

generateSparqlQuery(genMap, listSeparator='⇹', naive=True, lang='en')

generate a SPARQL Query

Parameters:

Name Type Description Default
genMap(dict)

a dictionary of generation items aggregates/ignores/labels

required
listSeparator(str)

the symbole to use as a list separator for GROUP_CONCAT

required
naive(bool)

if True - generate a naive straight forward SPARQL query if False generate a proper truly tabular aggregate query

required
lang(str)

the language to generate for

required

Returns:

Name Type Description
str str

the generated SPARQL Query

Source code in ez_wikidata/trulytabular.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
    def generateSparqlQuery(
        self,
        genMap: dict,
        listSeparator: str = "⇹",
        naive: bool = True,
        lang: str = "en",
    ) -> str:
        """
        generate a SPARQL Query

        Args:
            genMap(dict): a dictionary of generation items aggregates/ignores/labels
            listSeparator(str): the symbole to use as a list separator for GROUP_CONCAT
            naive(bool): if True - generate a naive straight forward SPARQL query
                if False generate a proper truly tabular aggregate query
            lang(str): the language to generate for

        Returns:
            str: the generated SPARQL Query
        """
        # The Wikidata item to generate the query for
        item = self.item
        # the name of this script
        script = Path(__file__).name
        # the mode of generation
        naiveText = "naive" if naive else "aggregate"
        # start with th preamble and PREFIX section
        # select the item and it's label
        sparqlQuery = f"""# truly tabular {naiveText} query for 
# {item.qid}:{item.qlabel}
# generated by {script} version {Version.version} on {self.isodate}
{Prefixes.getPrefixes()}
SELECT ?{item.itemVarname} ?{item.labelVarname}"""
        # loop over all properties
        for wdProp in self.properties.values():
            if naive:
                sparqlQuery += f"\n  ?{wdProp.valueVarname}"
            else:
                if wdProp.pid in genMap:
                    genList = genMap[wdProp.pid]
                    for aggregate in genList:
                        if not aggregate in ["ignore", "label"]:
                            distinct = ""
                            if aggregate == "list":
                                aggregateFunc = "GROUP_CONCAT"
                                aggregateParam = f';SEPARATOR="{listSeparator}"'
                                distinct = "DISTINCT "
                            else:
                                if aggregate == "count":
                                    distinct = "DISTINCT "
                                aggregateFunc = aggregate.upper()
                                aggregateParam = ""
                            sparqlQuery += f"\n  ({aggregateFunc} ({distinct}?{wdProp.valueVarname}{aggregateParam}) AS ?{wdProp.valueVarname}_{aggregate})"
                        elif aggregate == "label":
                            sparqlQuery += f"\n  ?{wdProp.labelVarname}"
                        elif aggregate == "ignore" and not "label" in genList:
                            sparqlQuery += f"\n  ?{wdProp.valueVarname}"
        sparqlQuery += f"""
WHERE {{
  # instanceof {item.qid}:{item.qlabel}
  ?{item.itemVarname} {self.search_predicate} wd:{item.qid}.
  # label
  ?{item.itemVarname} rdfs:label ?{item.labelVarname}.  
  FILTER (LANG(?{item.labelVarname}) = "{lang}").
"""
        for wdProp in self.properties.values():
            sparqlQuery += f"""  # {wdProp}
  OPTIONAL {{ 
    ?{item.itemVarname} wdt:{wdProp.pid} ?{wdProp.valueVarname}. """
            if wdProp.pid in genMap:
                genList = genMap[wdProp.pid]
                if "label" in genList:
                    sparqlQuery += f"""\n    ?{wdProp.valueVarname} rdfs:label ?{wdProp.labelVarname}."""
                    sparqlQuery += (
                        f"""\n    FILTER (LANG(?{wdProp.labelVarname}) = "{lang}")."""
                    )
            sparqlQuery += "\n  }\n"
        # close where Clause
        sparqlQuery += """}\n"""
        # optionally add Aggregate
        if not naive:
            sparqlQuery += f"""GROUP BY
  ?{item.itemVarname} 
  ?{item.labelVarname}
"""
            for wdProp in self.properties.values():
                if wdProp.pid in genMap:
                    genList = genMap[wdProp.pid]
                    if "label" in genList:
                        sparqlQuery += f"\n  ?{wdProp.labelVarname}"
                    if "ignore" in genList and not "label" in genList:
                        sparqlQuery += f"\n  ?{wdProp.valueVarname}"
            havingCount = 0
            havingDelim = "   "
            for wdProp in self.properties.values():
                if wdProp.pid in genMap:
                    genList = genMap[wdProp.pid]
                    if "ignore" in genList:
                        havingCount += 1
                        if havingCount == 1:
                            sparqlQuery += f"\nHAVING ("

                        sparqlQuery += (
                            f"\n  {havingDelim}COUNT(?{wdProp.valueVarname})<=1"
                        )
                        havingDelim = "&& "
            if havingCount > 0:
                sparqlQuery += f"\n)"
        return sparqlQuery

getPropertyStatistics()

get the property Statistics

Source code in ez_wikidata/trulytabular.py
441
442
443
444
445
446
447
448
449
450
def getPropertyStatistics(self):
    """
    get the property Statistics
    """
    itemCount, _itemCountQuery = self.count()
    lod = [{"property": "∑", "total": itemCount, "total%": 100.0}]
    for wdProperty in self.properties.values():
        statsRow = self.genWdPropertyStatistic(wdProperty, itemCount)
        lod.append(statsRow)
    return lod

getQueryManager(lang='sparql', name='trulytabular', debug=False) classmethod

get the query manager for the given language and fileName

Parameters:

Name Type Description Default
lang(str)

the language of the queries to extract

required
name(str)

the name of the manager containing the query specifications

required
debug(bool)

if True set debugging on

required
Source code in ez_wikidata/trulytabular.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
@classmethod
def getQueryManager(cls, lang="sparql", name="trulytabular", debug=False):
    """
    get the query manager for the given language and fileName

    Args:
        lang(str): the language of the queries to extract
        name(str): the name of the manager containing the query specifications
        debug(bool): if True set debugging on
    """
    qYamlFileName = f"{name}.yaml"
    for qYamlFile in YamlPath.getPaths(qYamlFileName):
        if os.path.isfile(qYamlFile):
            qm = QueryManager(lang=lang, debug=debug, queriesPath=qYamlFile)
            return qm
    return None

mostFrequentPropertiesQuery(whereClause=None, minCount=0)

get the most frequently used properties

Parameters:

Name Type Description Default
whereClause(str)

an extra WhereClause to use

required
Source code in ez_wikidata/trulytabular.py
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
    def mostFrequentPropertiesQuery(self, whereClause: str = None, minCount: int = 0):
        """
        get the most frequently used properties

        Args:
            whereClause(str): an extra WhereClause to use
        """
        if whereClause is None:
            whereClause = f"?item {self.search_predicate} wd:{self.itemQid}"
            if self.endpointConf.database != "qlever":
                whereClause += ";?p ?id"
        whereClause += "."
        minCountFilter = ""
        if minCount > 0:
            minCountFilter = f"\n  FILTER(?count >{minCount})."
        itemText = self.getItemText()
        sparqlQuery = f"""# get the most frequently used properties for
# {itemText}
{Prefixes.getPrefixes()}
SELECT ?prop ?propLabel ?wbType ?count WHERE {{
  {{"""
        if self.endpointConf.database == "qlever":
            sparqlQuery += f"""
    SELECT ?p (COUNT(DISTINCT ?item) AS ?count) WHERE {{"""
        else:
            sparqlQuery += f"""
    SELECT ?prop (COUNT(DISTINCT ?item) AS ?count) WHERE {{"""
        if self.endpointConf.database == "blazegraph":
            sparqlQuery += f"""
      hint:Query hint:optimizer "None"."""
        sparqlQuery += f"""
      {whereClause}"""
        if self.endpointConf.database == "qlever":
            sparqlQuery += f"""  
      ?item ql:has-predicate ?p 
    }} GROUP BY ?p
  }}
  ?prop wikibase:directClaim ?p."""
        else:
            sparqlQuery += f"""
      ?prop wikibase:directClaim ?p.
    }}
    GROUP BY ?prop ?propLabel
  }}"""
        sparqlQuery += f"""
  ?prop rdfs:label ?propLabel.
  ?prop wikibase:propertyType ?wbType.
  FILTER(LANG(?propLabel) = "{self.lang}").{minCountFilter}  
}}
ORDER BY DESC (?count)
"""
        title = f"most frequently used properties for {self.item.asText(long=True)}"
        query = Query(
            name=f"mostFrequentProperties for {itemText}",
            query=sparqlQuery,
            title=title,
        )
        return query

noneTabular(wdProperty)

get the none tabular result for the given Wikidata property

Parameters:

Name Type Description Default
wdProperty(WikidataProperty)

the Wikidata property

required
Source code in ez_wikidata/trulytabular.py
359
360
361
362
363
364
365
366
367
368
369
370
def noneTabular(self, wdProperty: WikidataProperty):
    """
    get the none tabular result for the given Wikidata property

    Args:
        wdProperty(WikidataProperty): the Wikidata property
    """
    query = self.noneTabularQuery(wdProperty)
    if self.debug:
        logging.info(query.query)
    qlod = self.sparql.queryAsListOfDicts(query.query)
    return qlod

noneTabularQuery(wdProperty, asFrequency=True)

get the none tabular entries for the given property

Parameters:

Name Type Description Default
wdProperty(WikidataProperty)

the property to analyze

required
asFrequency(bool)

if true do a frequency analysis

required
Source code in ez_wikidata/trulytabular.py
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
    def noneTabularQuery(self, wdProperty: WikidataProperty, asFrequency: bool = True):
        """
        get the none tabular entries for the given property

        Args:
            wdProperty(WikidataProperty): the property to analyze
            asFrequency(bool): if true do a frequency analysis
        """
        propertyLabel = wdProperty.plabel
        propertyId = wdProperty.pid
        # work around https://github.com/RDFLib/sparqlwrapper/issues/211
        if "described at" in propertyLabel:
            propertyLabel = propertyLabel.replace("described at", "describ'd at")
        sparql = f"""SELECT ?item ?itemLabel (COUNT (?value) AS ?count)
WHERE
{{
  # instance of {self.item.qlabel}
  ?item {self.search_predicate} wd:{self.itemQid}.{self.where}
  ?item rdfs:label ?itemLabel.
  FILTER (LANG(?itemLabel) = "{self.lang}").
  # {propertyLabel}
  ?item {wdProperty.getPredicate()} ?value.
}} GROUP BY ?item ?itemLabel
"""
        if asFrequency:
            freqDesc = "frequencies"
            sparql = f"""SELECT ?count (COUNT(?count) AS ?frequency) WHERE {{{{
{sparql}
}}}}
GROUP BY ?count
ORDER BY DESC (?frequency)"""
        else:
            freqDesc = "records"
            sparql = f"""{sparql}
HAVING (COUNT (?value) > 1)
ORDER BY DESC(?count)"""
        itemText = self.getItemText()
        sparql = (
            f"""# Count all {itemText} items
# with the given {propertyLabel}({propertyId}) https://www.wikidata.org/wiki/Property:{propertyId} 
{Prefixes.getPrefixes()}
"""
            + sparql
        )
        title = f"non tabular entries for {self.item.qlabel}/{propertyLabel}:{freqDesc}"
        name = f"NonTabular {self.item.qlabel}/{propertyLabel}:{freqDesc}"
        query = Query(query=sparql, name=name, title=title)
        return query

version

Created on 2024-03-01

@author: wf

Version dataclass

Bases: object

Version handling for easy wikidata access

Source code in ez_wikidata/version.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@dataclass
class Version(object):
    """
    Version handling for easy wikidata access
    """

    name = "py_ez_wikidata"
    version = ez_wikidata.__version__
    date = "2024-03-01"
    updated = "2024-08-09"
    description = "Mapping for Wikidata allows creation of wikidata entries from dicts"

    authors = "Tim Holzheim, Wolfgang Fahl"

    doc_url = "https://wiki.bitplan.com/index.php/Py_ez_wikidata"
    chat_url = "https://github.com/WolfgangFahl/py_ez_wikidata/discussions"
    cm_url = "https://github.com/WolfgangFahl/py_ez_wikidata"

    license = f"""Copyright 2024 contributors. All rights reserved.

  Licensed under the Apache License 2.0
  http://www.apache.org/licenses/LICENSE-2.0

  Distributed on an "AS IS" basis without warranties
  or conditions of any kind, either express or implied."""
    longDescription = f"""{name} version {version}
{description}

  Created by {authors} on {date} last updated {updated}"""

wbquery

Created on 2022-04-30

@author: wf

WikibaseQuery

Bases: object

a Query for Wikibase

Source code in ez_wikidata/wbquery.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
class WikibaseQuery(object):
    """
    a Query for Wikibase
    """

    def __init__(
        self, entity: str, wpm: WikidataPropertyManager = None, debug: bool = False
    ):
        """
        Constructor

        Args:
            entity(str): the entity this query represents
            debug(bool): if True switch on debugging
        """
        self.debug = debug
        self.entity = entity
        if wpm is None:
            wpm = WikidataPropertyManager.get_instance()
        self.wpm = wpm
        self.propertiesByName = {}
        self.propertiesById = {}
        self.propertiesByVarname = {}
        self.propertiesByColumn = {}
        self.rows = []

    def get_property_mappings(self) -> List[PropertyMapping]:
        """
        Get the property mappings as PropertyMapping list

        Returns:
            List[PropertyMapping]: list of PropertyMappings
        """
        prop_maps = self.wpm.get_mappings_for_records(self.propertiesByColumn)
        return prop_maps

    def get_item_mapping(self) -> PropertyMapping:
        """
        Get the mapping that describes the wikidata entity item
        """
        return PropertyMapping.get_item_mapping(self.get_property_mappings())

    def addPropertyFromDescriptionRow(self, row):
        """
        add a property from the given row

        Args:
            row(dict): the row to add
        """
        self.rows.append(row)
        propName = row["PropertyName"]
        propId = row["PropertyId"]
        column = row["Column"]
        # properties might contain blank - replace for SPARQL variable names
        propVarname = row.get("PropVarname", propName)
        propVarname = propVarname.replace(" ", "_")
        propVarname = propVarname.replace("-", "_")
        row["PropVarname"] = propVarname
        # set the values of the lookups
        self.propertiesByName[propName] = row
        self.propertiesByColumn[column] = row
        self.propertiesById[propId] = row
        self.propertiesByVarname[propVarname] = row

    def getColumnTypeAndVarname(self, propName: str) -> (str, str, str):
        """
        get a signature tuple consisting of columnName, propertType and SPARQL variable Name for the given property Name

        Args:
            propName(str): the name of the property

        Raises:
            Exception: if property name is not known

        Returns:
            column,propType,varName tupel
        """
        if propName in self.propertiesByName:
            propRow = self.propertiesByName[propName]
            column = propRow["Column"]
            propType = propRow["Type"]
            varName = propRow["PropVarname"]
            if propType == "item" and varName in [None, ""]:
                varName = "item"
        else:
            raise Exception(
                f"unknown property name {propName} for entity {self.entity}"
            )
        return column, propType, varName

    def inFilter(
        self, values: list, propName: str = "short_name", lang: str = "en"
    ) -> str:
        """
        create a SPARQL IN filter clause

        Args:
            values(list): the list of values to filter for
            propName(str): the property name to filter with
            lang(str): the language to apply
        """
        filterClause = f"\n  FILTER(?{propName} IN("
        delim = ""
        for value in values:
            filterClause += f"{delim}\n    '{value}'@{lang}"
            delim = ","
        filterClause += "\n  ))."
        return filterClause

    def getValuesClause(
        self,
        values: list,
        propVarname: str = "short_name",
        propType: str = "text",
        lang: str = None,
        ignoreEmpty: bool = True,
        wbPrefix: str = "http://www.wikidata.org/entity/",
    ):
        """
        create a SPARQL Values clause

        Args:
            values(list): the list of values to create values for
            propVarname(str): the property variable name to assign the values for
            propType:
            lang: language of labels to query
            ignoreEmpty(bool): ignore empty values if True
            wbPrefix(str): a wikibase/wikidata prefix to be removed for items values
        Returns:
            str: the SPARQL values clause
        """
        valuesClause = f"\n  VALUES(?{propVarname}) {{"
        if lang is not None and propType == "text":
            lang = f"@{lang}"
        else:
            lang = ""
        for value in values:
            if value or not ignoreEmpty:
                if propType in ["item", "itemid", "", None]:
                    if value and value.startswith(wbPrefix):
                        value = value.replace(wbPrefix, "")
                    valuesClause += f"\n   ( wd:{value} )"
                else:
                    if isinstance(value, str):
                        # escape single quotes
                        value = value.replace("'", "\\'")
                        valuesClause += f"\n  ( '{value}'{lang} )"
                    else:
                        valuesClause += f"\n  ( {str(value)} )"
        valuesClause += "\n  }."
        return valuesClause

    def asSparql(
        self,
        filterClause: str = None,
        orderClause: str = None,
        pk: str = None,
        lang: str = "en",
    ) -> str:
        """
        get the sparqlQuery for this query optionally applying a filterClause

        Args:
            filterClause(str): a filter to be applied (if any)
            orderClause(str): an orderClause to be applied (if any)
            pk(str): primaryKey (if any)
            lang(str): the language to be used for labels
        """
        item_mapping = self.get_item_mapping()
        item_varname = item_mapping.varname
        sparql = f"""# 
# get {self.entity} records 
#  
PREFIX pq: <http://www.wikidata.org/prop/qualifier/>
PREFIX p: <http://www.wikidata.org/prop/>
PREFIX schema: <http://schema.org/>
PREFIX wd: <http://www.wikidata.org/entity/>
PREFIX wdt: <http://www.wikidata.org/prop/direct/>
PREFIX wikibase: <http://wikiba.se/ontology#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
SELECT ?{item_varname} ?{item_varname}Label ?{item_varname}Description
"""
        for prop_map in self.get_property_mappings():
            if prop_map.is_item_itself():
                continue
            if not prop_map.value and prop_map.varname:
                property_selections = f"\n  ?{prop_map.varname}"
                if prop_map.property_type_enum is WdDatatype.itemid:
                    # items will automatically fetch labels
                    property_selections += f" ?{prop_map.varname}Label"
                elif prop_map.property_type_enum is WdDatatype.extid:
                    # extid' will automatically fetch formatted URIs
                    property_selections += f" ?{prop_map.varname}Url"
                sparql += property_selections
        query_item_label = f"""?{item_varname} rdfs:label ?{item_varname}Label. FILTER(LANG(?{item_varname}Label) = "{lang}")"""
        query_item_desc = f"""?{item_varname} schema:description ?{item_varname}Description. FILTER(LANG(?{item_varname}Description) = "{lang}")"""
        sparql += f"""\nWHERE {{
    {query_item_label}
    OPTIONAL {{
        {query_item_desc}
    }}
"""
        for prop_map in self.get_property_mappings():
            if prop_map.propertyId in [None, ""]:
                continue
            if prop_map.value:
                # value predefined for property
                sparql += f"\n  ?{item_varname} wdt:{prop_map.propertyId} wd:{prop_map.value}."
            else:
                if prop_map.varname:
                    # primary keys are not optional
                    optional = pk is None or not prop_map.propertyName == pk
                    if optional:
                        sparql += "\n  OPTIONAL {"
                    sparql += f"\n    ?{item_varname} wdt:{prop_map.propertyId} ?{prop_map.varname}."
                    if prop_map.property_type_enum is WdDatatype.itemid:
                        # also query label of the qid with language lang
                        sparql += f"\n    ?{prop_map.varname} rdfs:label ?{prop_map.varname}Label."
                        sparql += f"""\n    FILTER(LANG(?{prop_map.varname}Label) = "{lang}")"""
                    elif prop_map.property_type_enum is WdDatatype.extid:
                        # ToDo: decision to make see https://github.com/WolfgangFahl/PyGenericSpreadSheet/issues/15
                        sparql += f"\n    wd:{prop_map.propertyId} wdt:P1630 ?{prop_map.varname}FormatterUrl."
                        sparql += f"\n    BIND(IRI(REPLACE(?{prop_map.varname}, '^(.+)$', ?{prop_map.varname}FormatterUrl)) AS ?{prop_map.varname}Url)."
                    if optional:
                        sparql += "\n  }"
        if filterClause is not None:
            sparql += f"\n{filterClause}"
        sparql += "\n}"
        if orderClause is not None:
            sparql += f"\n{orderClause}"
        return sparql

    @classmethod
    def ofMapRows(
        cls, entityMapRows: list, debug: bool = False
    ) -> Dict[str, "WikibaseQuery"]:
        """
        create a dict of wikibaseQueries from the given entityMap list of dicts

        Args:
            entityMapRows(list): a list of dict with row descriptions
            debug(bool): if True switch on debugging
        """
        queries = {}
        entityMapDict = {}
        for row in entityMapRows:
            if "Entity" in row:
                entity = row["Entity"]
                if not entity in entityMapDict:
                    entityMapDict[entity] = {}
                entityRows = entityMapDict[entity]
                if "PropertyName" in row:
                    propertyName = row["PropertyName"]
                    entityRows[propertyName] = row
        if debug:
            pprint.pprint(entityMapDict)
        for entity in entityMapDict:
            wbQuery = WikibaseQuery.ofEntityMap(entity, entityMapDict[entity])
            queries[entity] = wbQuery
        return queries

    @classmethod
    def ofEntityMap(cls, entity: str, entityMap: dict) -> "WikibaseQuery":
        """
        create a WikibaseQuery for the given entity and entityMap

        Args:
            entity(str): the entity name
            entityMap(dict): the entity property descriptions
        Returns:
            WikibaseQuery
        """
        wbQuery = WikibaseQuery(entity)
        for row in entityMap.values():
            wbQuery.addPropertyFromDescriptionRow(row)
        return wbQuery

__init__(entity, wpm=None, debug=False)

Constructor

Parameters:

Name Type Description Default
entity(str)

the entity this query represents

required
debug(bool)

if True switch on debugging

required
Source code in ez_wikidata/wbquery.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def __init__(
    self, entity: str, wpm: WikidataPropertyManager = None, debug: bool = False
):
    """
    Constructor

    Args:
        entity(str): the entity this query represents
        debug(bool): if True switch on debugging
    """
    self.debug = debug
    self.entity = entity
    if wpm is None:
        wpm = WikidataPropertyManager.get_instance()
    self.wpm = wpm
    self.propertiesByName = {}
    self.propertiesById = {}
    self.propertiesByVarname = {}
    self.propertiesByColumn = {}
    self.rows = []

addPropertyFromDescriptionRow(row)

add a property from the given row

Parameters:

Name Type Description Default
row(dict)

the row to add

required
Source code in ez_wikidata/wbquery.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def addPropertyFromDescriptionRow(self, row):
    """
    add a property from the given row

    Args:
        row(dict): the row to add
    """
    self.rows.append(row)
    propName = row["PropertyName"]
    propId = row["PropertyId"]
    column = row["Column"]
    # properties might contain blank - replace for SPARQL variable names
    propVarname = row.get("PropVarname", propName)
    propVarname = propVarname.replace(" ", "_")
    propVarname = propVarname.replace("-", "_")
    row["PropVarname"] = propVarname
    # set the values of the lookups
    self.propertiesByName[propName] = row
    self.propertiesByColumn[column] = row
    self.propertiesById[propId] = row
    self.propertiesByVarname[propVarname] = row

asSparql(filterClause=None, orderClause=None, pk=None, lang='en')

get the sparqlQuery for this query optionally applying a filterClause

Parameters:

Name Type Description Default
filterClause(str)

a filter to be applied (if any)

required
orderClause(str)

an orderClause to be applied (if any)

required
pk(str)

primaryKey (if any)

required
lang(str)

the language to be used for labels

required
Source code in ez_wikidata/wbquery.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
    def asSparql(
        self,
        filterClause: str = None,
        orderClause: str = None,
        pk: str = None,
        lang: str = "en",
    ) -> str:
        """
        get the sparqlQuery for this query optionally applying a filterClause

        Args:
            filterClause(str): a filter to be applied (if any)
            orderClause(str): an orderClause to be applied (if any)
            pk(str): primaryKey (if any)
            lang(str): the language to be used for labels
        """
        item_mapping = self.get_item_mapping()
        item_varname = item_mapping.varname
        sparql = f"""# 
# get {self.entity} records 
#  
PREFIX pq: <http://www.wikidata.org/prop/qualifier/>
PREFIX p: <http://www.wikidata.org/prop/>
PREFIX schema: <http://schema.org/>
PREFIX wd: <http://www.wikidata.org/entity/>
PREFIX wdt: <http://www.wikidata.org/prop/direct/>
PREFIX wikibase: <http://wikiba.se/ontology#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
SELECT ?{item_varname} ?{item_varname}Label ?{item_varname}Description
"""
        for prop_map in self.get_property_mappings():
            if prop_map.is_item_itself():
                continue
            if not prop_map.value and prop_map.varname:
                property_selections = f"\n  ?{prop_map.varname}"
                if prop_map.property_type_enum is WdDatatype.itemid:
                    # items will automatically fetch labels
                    property_selections += f" ?{prop_map.varname}Label"
                elif prop_map.property_type_enum is WdDatatype.extid:
                    # extid' will automatically fetch formatted URIs
                    property_selections += f" ?{prop_map.varname}Url"
                sparql += property_selections
        query_item_label = f"""?{item_varname} rdfs:label ?{item_varname}Label. FILTER(LANG(?{item_varname}Label) = "{lang}")"""
        query_item_desc = f"""?{item_varname} schema:description ?{item_varname}Description. FILTER(LANG(?{item_varname}Description) = "{lang}")"""
        sparql += f"""\nWHERE {{
    {query_item_label}
    OPTIONAL {{
        {query_item_desc}
    }}
"""
        for prop_map in self.get_property_mappings():
            if prop_map.propertyId in [None, ""]:
                continue
            if prop_map.value:
                # value predefined for property
                sparql += f"\n  ?{item_varname} wdt:{prop_map.propertyId} wd:{prop_map.value}."
            else:
                if prop_map.varname:
                    # primary keys are not optional
                    optional = pk is None or not prop_map.propertyName == pk
                    if optional:
                        sparql += "\n  OPTIONAL {"
                    sparql += f"\n    ?{item_varname} wdt:{prop_map.propertyId} ?{prop_map.varname}."
                    if prop_map.property_type_enum is WdDatatype.itemid:
                        # also query label of the qid with language lang
                        sparql += f"\n    ?{prop_map.varname} rdfs:label ?{prop_map.varname}Label."
                        sparql += f"""\n    FILTER(LANG(?{prop_map.varname}Label) = "{lang}")"""
                    elif prop_map.property_type_enum is WdDatatype.extid:
                        # ToDo: decision to make see https://github.com/WolfgangFahl/PyGenericSpreadSheet/issues/15
                        sparql += f"\n    wd:{prop_map.propertyId} wdt:P1630 ?{prop_map.varname}FormatterUrl."
                        sparql += f"\n    BIND(IRI(REPLACE(?{prop_map.varname}, '^(.+)$', ?{prop_map.varname}FormatterUrl)) AS ?{prop_map.varname}Url)."
                    if optional:
                        sparql += "\n  }"
        if filterClause is not None:
            sparql += f"\n{filterClause}"
        sparql += "\n}"
        if orderClause is not None:
            sparql += f"\n{orderClause}"
        return sparql

getColumnTypeAndVarname(propName)

get a signature tuple consisting of columnName, propertType and SPARQL variable Name for the given property Name

Parameters:

Name Type Description Default
propName(str)

the name of the property

required

Raises:

Type Description
Exception

if property name is not known

Returns:

Type Description
(str, str, str)

column,propType,varName tupel

Source code in ez_wikidata/wbquery.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def getColumnTypeAndVarname(self, propName: str) -> (str, str, str):
    """
    get a signature tuple consisting of columnName, propertType and SPARQL variable Name for the given property Name

    Args:
        propName(str): the name of the property

    Raises:
        Exception: if property name is not known

    Returns:
        column,propType,varName tupel
    """
    if propName in self.propertiesByName:
        propRow = self.propertiesByName[propName]
        column = propRow["Column"]
        propType = propRow["Type"]
        varName = propRow["PropVarname"]
        if propType == "item" and varName in [None, ""]:
            varName = "item"
    else:
        raise Exception(
            f"unknown property name {propName} for entity {self.entity}"
        )
    return column, propType, varName

getValuesClause(values, propVarname='short_name', propType='text', lang=None, ignoreEmpty=True, wbPrefix='http://www.wikidata.org/entity/')

create a SPARQL Values clause

Parameters:

Name Type Description Default
values(list)

the list of values to create values for

required
propVarname(str)

the property variable name to assign the values for

required
propType str
'text'
lang str

language of labels to query

None
ignoreEmpty(bool)

ignore empty values if True

required
wbPrefix(str)

a wikibase/wikidata prefix to be removed for items values

required

Returns: str: the SPARQL values clause

Source code in ez_wikidata/wbquery.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def getValuesClause(
    self,
    values: list,
    propVarname: str = "short_name",
    propType: str = "text",
    lang: str = None,
    ignoreEmpty: bool = True,
    wbPrefix: str = "http://www.wikidata.org/entity/",
):
    """
    create a SPARQL Values clause

    Args:
        values(list): the list of values to create values for
        propVarname(str): the property variable name to assign the values for
        propType:
        lang: language of labels to query
        ignoreEmpty(bool): ignore empty values if True
        wbPrefix(str): a wikibase/wikidata prefix to be removed for items values
    Returns:
        str: the SPARQL values clause
    """
    valuesClause = f"\n  VALUES(?{propVarname}) {{"
    if lang is not None and propType == "text":
        lang = f"@{lang}"
    else:
        lang = ""
    for value in values:
        if value or not ignoreEmpty:
            if propType in ["item", "itemid", "", None]:
                if value and value.startswith(wbPrefix):
                    value = value.replace(wbPrefix, "")
                valuesClause += f"\n   ( wd:{value} )"
            else:
                if isinstance(value, str):
                    # escape single quotes
                    value = value.replace("'", "\\'")
                    valuesClause += f"\n  ( '{value}'{lang} )"
                else:
                    valuesClause += f"\n  ( {str(value)} )"
    valuesClause += "\n  }."
    return valuesClause

get_item_mapping()

Get the mapping that describes the wikidata entity item

Source code in ez_wikidata/wbquery.py
49
50
51
52
53
def get_item_mapping(self) -> PropertyMapping:
    """
    Get the mapping that describes the wikidata entity item
    """
    return PropertyMapping.get_item_mapping(self.get_property_mappings())

get_property_mappings()

Get the property mappings as PropertyMapping list

Returns:

Type Description
List[PropertyMapping]

List[PropertyMapping]: list of PropertyMappings

Source code in ez_wikidata/wbquery.py
39
40
41
42
43
44
45
46
47
def get_property_mappings(self) -> List[PropertyMapping]:
    """
    Get the property mappings as PropertyMapping list

    Returns:
        List[PropertyMapping]: list of PropertyMappings
    """
    prop_maps = self.wpm.get_mappings_for_records(self.propertiesByColumn)
    return prop_maps

inFilter(values, propName='short_name', lang='en')

create a SPARQL IN filter clause

Parameters:

Name Type Description Default
values(list)

the list of values to filter for

required
propName(str)

the property name to filter with

required
lang(str)

the language to apply

required
Source code in ez_wikidata/wbquery.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def inFilter(
    self, values: list, propName: str = "short_name", lang: str = "en"
) -> str:
    """
    create a SPARQL IN filter clause

    Args:
        values(list): the list of values to filter for
        propName(str): the property name to filter with
        lang(str): the language to apply
    """
    filterClause = f"\n  FILTER(?{propName} IN("
    delim = ""
    for value in values:
        filterClause += f"{delim}\n    '{value}'@{lang}"
        delim = ","
    filterClause += "\n  ))."
    return filterClause

ofEntityMap(entity, entityMap) classmethod

create a WikibaseQuery for the given entity and entityMap

Parameters:

Name Type Description Default
entity(str)

the entity name

required
entityMap(dict)

the entity property descriptions

required

Returns: WikibaseQuery

Source code in ez_wikidata/wbquery.py
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
@classmethod
def ofEntityMap(cls, entity: str, entityMap: dict) -> "WikibaseQuery":
    """
    create a WikibaseQuery for the given entity and entityMap

    Args:
        entity(str): the entity name
        entityMap(dict): the entity property descriptions
    Returns:
        WikibaseQuery
    """
    wbQuery = WikibaseQuery(entity)
    for row in entityMap.values():
        wbQuery.addPropertyFromDescriptionRow(row)
    return wbQuery

ofMapRows(entityMapRows, debug=False) classmethod

create a dict of wikibaseQueries from the given entityMap list of dicts

Parameters:

Name Type Description Default
entityMapRows(list)

a list of dict with row descriptions

required
debug(bool)

if True switch on debugging

required
Source code in ez_wikidata/wbquery.py
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
@classmethod
def ofMapRows(
    cls, entityMapRows: list, debug: bool = False
) -> Dict[str, "WikibaseQuery"]:
    """
    create a dict of wikibaseQueries from the given entityMap list of dicts

    Args:
        entityMapRows(list): a list of dict with row descriptions
        debug(bool): if True switch on debugging
    """
    queries = {}
    entityMapDict = {}
    for row in entityMapRows:
        if "Entity" in row:
            entity = row["Entity"]
            if not entity in entityMapDict:
                entityMapDict[entity] = {}
            entityRows = entityMapDict[entity]
            if "PropertyName" in row:
                propertyName = row["PropertyName"]
                entityRows[propertyName] = row
    if debug:
        pprint.pprint(entityMapDict)
    for entity in entityMapDict:
        wbQuery = WikibaseQuery.ofEntityMap(entity, entityMapDict[entity])
        queries[entity] = wbQuery
    return queries

wdproperty

Created on 02.03.2024-03-02

@author: wf

PropertyMapping

Represents a single column Wikidata property mapping.

Attributes:

Name Type Description
column Optional[str]

The column name in the data source; if None, the value is directly used.

propertyName str

The human-readable name of the property.

propertyId str

The Wikidata property ID (e.g., "P31").

propertyType str

The type of the property as a string; converted to an enum in post-init.

qualifierOf Optional[str]

Specifies if the property is a qualifier of another property.

valueLookupType Optional[Any]

The type (instance of/P31) of the property value for lookup if the value is not already a QID.

value Optional[Any]

The default value to set for the property.

varname Optional[str]

An optional variable name for internal use.

property_type_enum WdDatatype

The enum representation of the property type, initialized based on propertyType.

The post_init method ensures the propertyType is correctly interpreted and stored as both a string and an enum.

Source code in ez_wikidata/wdproperty.py
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
@lod_storable
class PropertyMapping:
    """
    Represents a single column Wikidata property mapping.

    Attributes:
        column (Optional[str]): The column name in the data source; if None, the value is directly used.
        propertyName (str): The human-readable name of the property.
        propertyId (str): The Wikidata property ID (e.g., "P31").
        propertyType (str): The type of the property as a string; converted to an enum in post-init.
        qualifierOf (Optional[str]): Specifies if the property is a qualifier of another property.
        valueLookupType (Optional[Any]): The type (instance of/P31) of the property value for lookup if the value is not already a QID.
        value (Optional[Any]): The default value to set for the property.
        varname (Optional[str]): An optional variable name for internal use.
        property_type_enum (WdDatatype): The enum representation of the property type, initialized based on propertyType.

    The __post_init__ method ensures the propertyType is correctly interpreted and stored as both a string and an enum.
    """

    column: Union[str, None]  # if None, the value is used
    propertyName: str
    propertyId: str
    propertyType: str
    qualifierOf: str = None
    valueLookupType: Any = None  # type (instance of/P31) of the property value → used to lookup the qid if property value if value is not already a qid
    value: Any = None  # set this value for the property
    varname: str = None
    # property_type_enum: WdDatatype=field(init=False)

    def __post_init__(self):
        """
        Convert propertyType from string to WdDatatype enum if necessary
        """
        self.property_type_enum = None
        if isinstance(self.propertyType, str):
            try:
                self.property_type_enum = WdDatatype[self.propertyType]
            except KeyError:
                raise ValueError(f"Invalid property type: {self.propertyType}")
        else:
            self.property_type_enum = self.propertyType
            # Ensure propertyType is stored as the correct string representation of the enum for YAML compatibility
            self.propertyType = self.property_type_enum.name

    @classmethod
    def get_legacy_mapping(cls) -> dict:
        """
        Returns the Mapping from old prop map keys to the new once
        """
        return {
            "Column": "column",
            "PropertyName": "propertyName",
            "PropertyId": "propertyId",
            "Type": "propertyType",
            "Qualifier": "qualifierOf",
            "Lookup": "valueLookupType",
            "Value": "value",
            "PropVarname": "varname",
        }

    @classmethod
    def from_record(
        cls, wpm: WikidataPropertyManager, record: dict
    ) -> "PropertyMapping":
        """
        initialize PropertyMapping from the given record
        Args:
            wpm(WikidataPropertyManager): to be used for type lookup
            record(Dict): property mapping information

        Returns:
            PropertyMapping
        """
        legacy_lookup = cls.get_legacy_mapping()
        record = record.copy()
        for i in range(len(record)):
            key = list(record.keys())[i]
            if key in legacy_lookup:
                record[legacy_lookup[key]] = record[key]
        # handle missing property type
        property_type = record.get("propertyType", None)
        if property_type in [None, ""]:
            if record.get("valueLookupType", None) not in [None, ""]:
                property_type = WdDatatype.itemid
            elif record.get("value", None) not in [None, ""]:
                property_type = WdDatatype.itemid
        if property_type is not None and not isinstance(property_type, WdDatatype):
            if property_type in [wd.name for wd in WdDatatype]:
                property_type = WdDatatype[property_type]
            else:
                pid = record.get("propertyId")
                props = wpm.get_properties_by_ids([pid])
                if len(props) == 1:
                    prop = list(props.values())[0]
                    property_type = prop.ptype
        mapping = PropertyMapping(
            column=record.get("column", None),
            propertyName=record.get("propertyName", None),
            propertyId=record.get("propertyId", None),
            propertyType=property_type,
            qualifierOf=record.get("qualifierOf", None),
            valueLookupType=record.get("valueLookupType", None),
            value=record.get("value", None),
            varname=record.get("varname", None),
        )
        return mapping

    def to_record(self) -> dict:
        """
        convert property mapping to its dict representation
        """
        key_map = self.get_legacy_mapping()
        record = dict()
        for old_key, new_key in key_map.items():
            record[old_key] = getattr(self, new_key, None)
        return record

    def is_qualifier(self) -> bool:
        """
        Returns true if the property mapping describes a qualifier
        """
        is_qualifier = not (self.qualifierOf is None or self.qualifierOf == "")
        return is_qualifier

    @classmethod
    def getDefaultItemPropertyMapping(cls) -> "PropertyMapping":
        """
        get the defaultItemPropertyMapping
        """
        if not hasattr(cls, "defaultItemPropertyMapping"):
            item_prop_map = PropertyMapping(
                column="item",
                propertyName="item",
                propertyId="",
                propertyType=WdDatatype.item,
                varname="item",
            )
            cls.defaultItemPropertyMapping = item_prop_map
        return cls.defaultItemPropertyMapping

    def is_item_itself(self) -> bool:
        """
        Check if the property_type is an item

        Returns:
            bool: True if the property mapping links to the existing item
        """
        is_item_id = self.property_type_enum == WdDatatype.item
        return is_item_id

    @classmethod
    def get_qualifier_lookup(
        cls, properties: List["PropertyMapping"]
    ) -> Dict[str, List["PropertyMapping"]]:
        """
        Get a lookup for a property and all its qualifier

        Args:
            properties: property mappings to generate the lookup from

        Returns:
             dict as property qualifier lookup
        """
        res = dict()
        for pm in properties:
            if not isinstance(pm, PropertyMapping):
                continue
            if pm.qualifierOf is None or pm.qualifierOf == "":
                continue
            else:
                if pm.qualifierOf in res:
                    res[pm.qualifierOf].append(pm)
                else:
                    res[pm.qualifierOf] = [pm]
        return res

    @classmethod
    def get_item_mapping(
        cls, property_mappings: List["PropertyMapping"]
    ) -> "PropertyMapping":
        """
        get the property mapping that is used for the default "item" primary key
        if no property is defined use the default "item" mapping
        """
        for pm in property_mappings:
            if pm.is_item_itself():
                return pm
        pm = cls.getDefaultItemPropertyMapping()
        return pm

__post_init__()

Convert propertyType from string to WdDatatype enum if necessary

Source code in ez_wikidata/wdproperty.py
452
453
454
455
456
457
458
459
460
461
462
463
464
465
def __post_init__(self):
    """
    Convert propertyType from string to WdDatatype enum if necessary
    """
    self.property_type_enum = None
    if isinstance(self.propertyType, str):
        try:
            self.property_type_enum = WdDatatype[self.propertyType]
        except KeyError:
            raise ValueError(f"Invalid property type: {self.propertyType}")
    else:
        self.property_type_enum = self.propertyType
        # Ensure propertyType is stored as the correct string representation of the enum for YAML compatibility
        self.propertyType = self.property_type_enum.name

from_record(wpm, record) classmethod

initialize PropertyMapping from the given record Args: wpm(WikidataPropertyManager): to be used for type lookup record(Dict): property mapping information

Returns:

Type Description
PropertyMapping

PropertyMapping

Source code in ez_wikidata/wdproperty.py
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
@classmethod
def from_record(
    cls, wpm: WikidataPropertyManager, record: dict
) -> "PropertyMapping":
    """
    initialize PropertyMapping from the given record
    Args:
        wpm(WikidataPropertyManager): to be used for type lookup
        record(Dict): property mapping information

    Returns:
        PropertyMapping
    """
    legacy_lookup = cls.get_legacy_mapping()
    record = record.copy()
    for i in range(len(record)):
        key = list(record.keys())[i]
        if key in legacy_lookup:
            record[legacy_lookup[key]] = record[key]
    # handle missing property type
    property_type = record.get("propertyType", None)
    if property_type in [None, ""]:
        if record.get("valueLookupType", None) not in [None, ""]:
            property_type = WdDatatype.itemid
        elif record.get("value", None) not in [None, ""]:
            property_type = WdDatatype.itemid
    if property_type is not None and not isinstance(property_type, WdDatatype):
        if property_type in [wd.name for wd in WdDatatype]:
            property_type = WdDatatype[property_type]
        else:
            pid = record.get("propertyId")
            props = wpm.get_properties_by_ids([pid])
            if len(props) == 1:
                prop = list(props.values())[0]
                property_type = prop.ptype
    mapping = PropertyMapping(
        column=record.get("column", None),
        propertyName=record.get("propertyName", None),
        propertyId=record.get("propertyId", None),
        propertyType=property_type,
        qualifierOf=record.get("qualifierOf", None),
        valueLookupType=record.get("valueLookupType", None),
        value=record.get("value", None),
        varname=record.get("varname", None),
    )
    return mapping

getDefaultItemPropertyMapping() classmethod

get the defaultItemPropertyMapping

Source code in ez_wikidata/wdproperty.py
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
@classmethod
def getDefaultItemPropertyMapping(cls) -> "PropertyMapping":
    """
    get the defaultItemPropertyMapping
    """
    if not hasattr(cls, "defaultItemPropertyMapping"):
        item_prop_map = PropertyMapping(
            column="item",
            propertyName="item",
            propertyId="",
            propertyType=WdDatatype.item,
            varname="item",
        )
        cls.defaultItemPropertyMapping = item_prop_map
    return cls.defaultItemPropertyMapping

get_item_mapping(property_mappings) classmethod

get the property mapping that is used for the default "item" primary key if no property is defined use the default "item" mapping

Source code in ez_wikidata/wdproperty.py
599
600
601
602
603
604
605
606
607
608
609
610
611
@classmethod
def get_item_mapping(
    cls, property_mappings: List["PropertyMapping"]
) -> "PropertyMapping":
    """
    get the property mapping that is used for the default "item" primary key
    if no property is defined use the default "item" mapping
    """
    for pm in property_mappings:
        if pm.is_item_itself():
            return pm
    pm = cls.getDefaultItemPropertyMapping()
    return pm

get_legacy_mapping() classmethod

Returns the Mapping from old prop map keys to the new once

Source code in ez_wikidata/wdproperty.py
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
@classmethod
def get_legacy_mapping(cls) -> dict:
    """
    Returns the Mapping from old prop map keys to the new once
    """
    return {
        "Column": "column",
        "PropertyName": "propertyName",
        "PropertyId": "propertyId",
        "Type": "propertyType",
        "Qualifier": "qualifierOf",
        "Lookup": "valueLookupType",
        "Value": "value",
        "PropVarname": "varname",
    }

get_qualifier_lookup(properties) classmethod

Get a lookup for a property and all its qualifier

Parameters:

Name Type Description Default
properties List[PropertyMapping]

property mappings to generate the lookup from

required

Returns:

Type Description
Dict[str, List[PropertyMapping]]

dict as property qualifier lookup

Source code in ez_wikidata/wdproperty.py
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
@classmethod
def get_qualifier_lookup(
    cls, properties: List["PropertyMapping"]
) -> Dict[str, List["PropertyMapping"]]:
    """
    Get a lookup for a property and all its qualifier

    Args:
        properties: property mappings to generate the lookup from

    Returns:
         dict as property qualifier lookup
    """
    res = dict()
    for pm in properties:
        if not isinstance(pm, PropertyMapping):
            continue
        if pm.qualifierOf is None or pm.qualifierOf == "":
            continue
        else:
            if pm.qualifierOf in res:
                res[pm.qualifierOf].append(pm)
            else:
                res[pm.qualifierOf] = [pm]
    return res

is_item_itself()

Check if the property_type is an item

Returns:

Name Type Description
bool bool

True if the property mapping links to the existing item

Source code in ez_wikidata/wdproperty.py
563
564
565
566
567
568
569
570
571
def is_item_itself(self) -> bool:
    """
    Check if the property_type is an item

    Returns:
        bool: True if the property mapping links to the existing item
    """
    is_item_id = self.property_type_enum == WdDatatype.item
    return is_item_id

is_qualifier()

Returns true if the property mapping describes a qualifier

Source code in ez_wikidata/wdproperty.py
540
541
542
543
544
545
def is_qualifier(self) -> bool:
    """
    Returns true if the property mapping describes a qualifier
    """
    is_qualifier = not (self.qualifierOf is None or self.qualifierOf == "")
    return is_qualifier

to_record()

convert property mapping to its dict representation

Source code in ez_wikidata/wdproperty.py
530
531
532
533
534
535
536
537
538
def to_record(self) -> dict:
    """
    convert property mapping to its dict representation
    """
    key_map = self.get_legacy_mapping()
    record = dict()
    for old_key, new_key in key_map.items():
        record[old_key] = getattr(self, new_key, None)
    return record

PropertyMappings dataclass

A collection of Wikidata property mappings, with metadata.

Source code in ez_wikidata/wdproperty.py
614
615
616
617
618
619
620
621
622
623
624
@lod_storable
@dataclass
class PropertyMappings:
    """
    A collection of Wikidata property mappings, with metadata.
    """

    name: str
    mappings: Dict[str, PropertyMapping] = field(default_factory=dict)
    description: Optional[str] = None
    url: Optional[str] = None

Variable

Variable e.g. name handling

Source code in ez_wikidata/wdproperty.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
class Variable:
    """
    Variable e.g. name handling
    """

    @classmethod
    def validVarName(cls, varStr: str) -> str:
        """
        convert the given potential variable name string to a valid
        variable name

        see https://stackoverflow.com/a/3305731/1497139

        Args:
            varStr(str): the string to convert

        Returns:
            str: a valid variable name
        """
        return re.sub("\W|^(?=\d)", "_", varStr)

validVarName(varStr) classmethod

convert the given potential variable name string to a valid variable name

see https://stackoverflow.com/a/3305731/1497139

Parameters:

Name Type Description Default
varStr(str)

the string to convert

required

Returns:

Name Type Description
str str

a valid variable name

Source code in ez_wikidata/wdproperty.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
@classmethod
def validVarName(cls, varStr: str) -> str:
    """
    convert the given potential variable name string to a valid
    variable name

    see https://stackoverflow.com/a/3305731/1497139

    Args:
        varStr(str): the string to convert

    Returns:
        str: a valid variable name
    """
    return re.sub("\W|^(?=\d)", "_", varStr)

WdDatatype

Bases: Enum

Supported Wikidata datatypes, sorted by frequency and including special cases.

Source code in ez_wikidata/wdproperty.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
class WdDatatype(Enum):
    """
    Supported Wikidata datatypes, sorted by frequency and including special cases.
    """

    extid = auto()  # ExternalId: 8645 occurrences
    itemid = auto()  # WikibaseItem: 1634 occurrences
    quantity = auto()  # Quantity: 652 occurrences
    string = auto()  # String: 329 occurrences
    url = auto()  # Url: 107 occurrences
    commons_media = auto()  # CommonsMedia: 79 occurrences
    time = auto()  # Time: 66 occurrences
    text = auto()  # Monolingualtext: 62 occurrences
    math = auto()  # Math: 36 occurrences
    wikibase_property = auto()  # WikibaseProperty: 21 occurrences
    wikibase_sense = auto()  # WikibaseSense: 19 occurrences
    wikibase_lexeme = auto()  # WikibaseLexeme: 17 occurrences
    globe_coordinate = auto()  # GlobeCoordinate: 11 occurrences
    wikibase_form = auto()  # WikibaseForm: 9 occurrences
    musical_notation = auto()  # MusicalNotation: 6 occurrences
    tabular_data = auto()  # TabularData: 6 occurrences
    geoshape = auto()  # GeoShape: 3 occurrences
    # Special cases:
    item = auto()  # Item: Special case
    year = auto()  # Year: Special case
    date = auto()  # Date: Special case

    @classmethod
    def from_wb_type_name(cls, wb_type_name: str) -> "WdDatatype":
        """
        convert a wikibase type name to a WdDatatype

        Args:
            wb_type_name(str): the string name of the wikibase type (with or without wikibase ontology prefix)
        """
        type_map = {
            "ExternalId": cls.extid,
            "WikibaseItem": cls.itemid,
            "Quantity": cls.quantity,
            "String": cls.string,
            "Url": cls.url,
            "CommonsMedia": cls.commons_media,
            "Time": cls.time,
            "Monolingualtext": cls.text,
            "Math": cls.math,
            "WikibaseProperty": cls.wikibase_property,
            "WikibaseSense": cls.wikibase_sense,
            "WikibaseLexeme": cls.wikibase_lexeme,
            "GlobeCoordinate": cls.globe_coordinate,
            "WikibaseForm": cls.wikibase_form,
            "MusicalNotation": cls.musical_notation,
            "TabularData": cls.tabular_data,
            "GeoShape": cls.geoshape,
        }
        wb_type_name = wb_type_name.replace("http://wikiba.se/ontology#", "")
        wd_type = type_map.get(wb_type_name, WdDatatype.string)
        return wd_type

    @classmethod
    def _missing_(cls, _value):
        """
        default datatype
        """
        return cls.text

    @classmethod
    def get_by_wikibase(cls, property_type: str) -> Union["WdDatatype", None]:
        """
        Get WdDatatype by the corresponding wikibase datatype
        Args:
            property_type: wikibase name of the type

        Returns:
            WdDatatype
        """
        wikibase_map = {
            "WikibaseItem": cls.itemid,
            "Time": cls.date,
            "Monolingualtext": cls.text,
            "String": cls.string,
            "ExternalId": cls.extid,
            "Url": cls.url,
        }
        return wikibase_map.get(property_type, None)

from_wb_type_name(wb_type_name) classmethod

convert a wikibase type name to a WdDatatype

Parameters:

Name Type Description Default
wb_type_name(str)

the string name of the wikibase type (with or without wikibase ontology prefix)

required
Source code in ez_wikidata/wdproperty.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
@classmethod
def from_wb_type_name(cls, wb_type_name: str) -> "WdDatatype":
    """
    convert a wikibase type name to a WdDatatype

    Args:
        wb_type_name(str): the string name of the wikibase type (with or without wikibase ontology prefix)
    """
    type_map = {
        "ExternalId": cls.extid,
        "WikibaseItem": cls.itemid,
        "Quantity": cls.quantity,
        "String": cls.string,
        "Url": cls.url,
        "CommonsMedia": cls.commons_media,
        "Time": cls.time,
        "Monolingualtext": cls.text,
        "Math": cls.math,
        "WikibaseProperty": cls.wikibase_property,
        "WikibaseSense": cls.wikibase_sense,
        "WikibaseLexeme": cls.wikibase_lexeme,
        "GlobeCoordinate": cls.globe_coordinate,
        "WikibaseForm": cls.wikibase_form,
        "MusicalNotation": cls.musical_notation,
        "TabularData": cls.tabular_data,
        "GeoShape": cls.geoshape,
    }
    wb_type_name = wb_type_name.replace("http://wikiba.se/ontology#", "")
    wd_type = type_map.get(wb_type_name, WdDatatype.string)
    return wd_type

get_by_wikibase(property_type) classmethod

Get WdDatatype by the corresponding wikibase datatype Args: property_type: wikibase name of the type

Returns:

Type Description
Union[WdDatatype, None]

WdDatatype

Source code in ez_wikidata/wdproperty.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
@classmethod
def get_by_wikibase(cls, property_type: str) -> Union["WdDatatype", None]:
    """
    Get WdDatatype by the corresponding wikibase datatype
    Args:
        property_type: wikibase name of the type

    Returns:
        WdDatatype
    """
    wikibase_map = {
        "WikibaseItem": cls.itemid,
        "Time": cls.date,
        "Monolingualtext": cls.text,
        "String": cls.string,
        "ExternalId": cls.extid,
        "Url": cls.url,
    }
    return wikibase_map.get(property_type, None)

WikidataProperty

Represents a Wikidata Property.

Source code in ez_wikidata/wdproperty.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
@lod_storable
class WikidataProperty:
    """
    Represents a Wikidata Property.
    """
    id: str # the id of the property - pid + lang
    pid: str  # The property ID
    lang: str
    plabel: str  # the label of the property
    description: str  # Description of the property
    type_name: str  # the type name
    reverse: bool = False  # Indicates if the property is used in reverse direction
    # Variables initialized in __post_init__
    # varname: str = field(init=False)
    # valueVarname: str = field(init=False)
    # labelVarname: str = field(init=False)
    # ptype: WdDatatype = field(init=False)

    def __post_init__(self):
        """
        creates and modify calculated fields
        """
        # not needed any more but does not hurt
        self.pid = self.pid.replace("http://www.wikidata.org/entity/", "")
        self.url = f"https://www.wikidata.org/wiki/Property:{self.pid}"
        self.ptype = WdDatatype.from_wb_type_name(self.type_name)
        self.varname = Variable.validVarName(self.plabel)
        self.valueVarname = (
            f"{self.varname}Item" if "WikibaseItem" in self.type_name else self.varname
        )
        self.labelVarname = self.varname

    def getPredicate(self):
        """
        get me as a Predicate
        """
        reverseToken = "^" if self.reverse else ""
        plabel = f"{reverseToken}wdt:{self.pid}"
        return plabel

    def __str__(self):
        text = self.pid
        if hasattr(self, "plabel"):
            text = f"{self.plabel} ({self.pid})"
        return text

__post_init__()

creates and modify calculated fields

Source code in ez_wikidata/wdproperty.py
147
148
149
150
151
152
153
154
155
156
157
158
159
def __post_init__(self):
    """
    creates and modify calculated fields
    """
    # not needed any more but does not hurt
    self.pid = self.pid.replace("http://www.wikidata.org/entity/", "")
    self.url = f"https://www.wikidata.org/wiki/Property:{self.pid}"
    self.ptype = WdDatatype.from_wb_type_name(self.type_name)
    self.varname = Variable.validVarName(self.plabel)
    self.valueVarname = (
        f"{self.varname}Item" if "WikibaseItem" in self.type_name else self.varname
    )
    self.labelVarname = self.varname

getPredicate()

get me as a Predicate

Source code in ez_wikidata/wdproperty.py
161
162
163
164
165
166
167
def getPredicate(self):
    """
    get me as a Predicate
    """
    reverseToken = "^" if self.reverse else ""
    plabel = f"{reverseToken}wdt:{self.pid}"
    return plabel

WikidataPropertyManager

handle Wikidata Properties

Source code in ez_wikidata/wdproperty.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
class WikidataPropertyManager:
    """
    handle Wikidata Properties
    """

    def __init__(
        self,
        endpoint_url: str = "https://qlever.cs.uni-freiburg.de/api/wikidata",
        langs: List[str] = ["en", "zh", "hi", "de", "fr", "ar", "es", "bn", "ru"],
        with_load: bool = True,
        profile: bool = True,
        debug: bool = False,
    ):
        """
        initialize the lookups
        """
        if not "en" in langs:
            raise ValueError(f"en is mandatory in langs -{langs}")
        self.langs = langs
        self.debug = debug
        self.profile = profile
        self.sparql = SPARQL(endpoint_url, debug=self.debug)
        self.sql_db_path = WikidataPropertyManager.get_cache_path()
        self.sql_db = SQLDB(self.sql_db_path)
        self.sparql_query = self.get_query_for_langs(langs)
        self.props = []
        self.props_by_id = {}
        self.props_by_lang = {}
        self.loaded=False
        if with_load:
            self.load()

    def load_from_sparql(self):
        """
        get my list of dicts from sparql
        """
        profiler = Profiler(f"getting wikidata properties for {len(self.langs)} languages via SPARQL", profile=self.profile)
        self.lod = self.sparql.queryAsListOfDicts(self.sparql_query)
        profiler.time()

    def store(self):
        """
        store my list of dicts
        """
        profiler = Profiler(f"caching wikidata properties to SQL", profile=self.profile)
        self.entity_info = self.sql_db.createTable(
            listOfRecords=self.lod,
            entityName="wd_properties",
            primaryKey="id",
            withCreate=True,
            withDrop=True,
            sampleRecordCount=100,
        )
        self.sql_db.store(
            listOfRecords=self.lod,
            entityInfo=self.entity_info,
            executeMany=True,
            fixNone=True,
        )
        profiler.time()

    def load_from_sql(self):
        """
        load from SQL
        """
        profiler = Profiler(
            f"loading wikidata properties from SQL", profile=self.profile
        )
        sql_query = "SELECT * FROM wd_properties"
        self.lod = self.sql_db.query(sql_query)
        profiler.time()

    def load(self):
        """
        load the properties
        """
        if self.loaded:
            return
        if os.path.isfile(self.sql_db_path) and os.stat(self.sql_db_path).st_size > 0:
            self.load_from_sql()
        else:
            self.load_from_sparql()
            for record in self.lod:
                pid=record["pid"]
                lang=record["lang"]
                pid = pid.replace("http://www.wikidata.org/entity/", "")
                record["pid"]=pid
                record["id"]=f"{pid}-{lang}"
            self.store()
        self.init_props()
        self.loaded=True

    def init_props(self):
        """
        initialize my property structures
        """
        self.props = []
        self.props_by_id = {}
        self.props_by_lang = {}
        if not self.lod:
            raise Exception(f"Could not fetch wikidata properties for {self.langs}")
        for record in self.lod:
            prop = WikidataProperty(**record)
            self.props.append(prop)
        for lang in self.langs:
            self.props_by_lang[lang] = {}
            self.props_by_id[lang]={}
        for prop in self.props:
            self.props_by_lang[prop.lang][prop.plabel] = prop
            self.props_by_id[prop.lang][prop.pid] = prop

    def get_mappings_for_records(
        self, prop_mapping_records: Dict[str, dict]
    ) -> List["PropertyMapping"]:
        """
        convert given list of property mapping records to list of PropertyMappings
        Args:
            prop_mapping_records: records to convert

        Returns:
            property mappings
        """
        mappings = []
        for record in prop_mapping_records.values():
            mapping = PropertyMapping.from_record(self, record)
            mappings.append(mapping)
        return mappings

    def get_query_for_langs(self, langs: list = None) -> str:
        """
        Get the SPARQL query for the given list of langs.
        """
        query_prefix = Prefixes.getPrefixes(["wikibase", "rdfs", "schema"])
        query_body = ""
        if langs is None:
            langs = self.langs
        for lang in langs:
            if query_body:  # If not the first iteration, add UNION
                query_body += "\n  UNION"
            query_body += f"""
  {{ # wikidata properties with {lang} labels and descriptions
    ?property a wikibase:Property;
    rdfs:label ?propertyLabel;
    schema:description ?propertyDescription;
    wikibase:propertyType ?wbType.
    FILTER(LANG(?propertyLabel) = "{lang}") .
    FILTER(LANG(?propertyDescription) = "{lang}") .
    BIND("{lang}" AS ?lang)
  }}"""
        query = (
            query_prefix + "SELECT \n"
            "  (STR(?property) AS ?pid)\n"
            "  ?lang\n"
            "  (?propertyLabel AS ?plabel)\n"
            "  (?propertyDescription AS ?description)\n"
            "  (STR(?wbType) AS ?type_name)\n"
            "WHERE {" + query_body + "\n}\n"
        )
        return query

    @classmethod
    def get_instance(
        cls,
        endpoint_url: str = "https://qlever.cs.uni-freiburg.de/api/wikidata",
    ) -> "WikidataPropertyManager":
        """
        initialize the wikidata property manager

        Args:
            endpoint_url(str): the SPARQL endpoint to query if there is no cache available
            lang(str): the languages to query propery labels and descriptions for
        """
        if not hasattr(cls, "wpm"):
            cls.wpm = WikidataPropertyManager(endpoint_url)
        return cls.wpm

    @classmethod
    def get_cache_path(cls, lang: str = "en") -> str:
        home = str(Path.home())
        cache_dir = f"{home}/.wikidata"
        os.makedirs(cache_dir, exist_ok=True)
        cache_path = f"{cache_dir}/wikidata_properties.db"
        return cache_path

    def get_properties_by_labels(
        self, labels: List[str], lang: str = "en"
    ) -> Dict[str, WikidataProperty]:
        """
        Get properties by their labels for a specific language.

        Args:
            labels: List of property labels to search for.
            lang: the language to match with
        Returns:
            A dictionary of {label: WikidataProperty} for found properties.
        """
        matched_properties = {}
        # Check if language exists in cached properties
        # Iterate over requested labels and try to find them in the cached properties
        for label in labels:
            if label in self.props_by_lang[lang]:
                matched_properties[label] = self.props_by_lang[lang][label]
        return matched_properties

    def get_properties_by_ids(
        self, ids: List[str],lang:str="en"
    ) -> Dict[str, Optional[WikidataProperty]]:
        """
        Get properties by their IDs for a specific language.

        Args:
            ids: List of property IDs to search for.
            lang(str): the language

        Returns:
            A dictionary of {property ID: WikidataProperty or None} for found and not found properties.
        """
        matched_properties = {}
        for pid in ids:
            # first check requested language
            if pid in self.props_by_id[lang]:
                matched_properties[pid] = self.props_by_id[lang][pid]
            elif pid in self.props_by_lang["en"]:
                # fall back to english
                matched_properties[pid] = self.props_by_id["en"][pid]
        return matched_properties

    def get_property_by_id(self, property_id: str) -> WikidataProperty:
        """
        lookup a WikidataProperty for the given property_id

        Args:
            property_id(str): a property ID e.g. "P6375"
        """
        properties = self.get_properties_by_ids([property_id])
        prop_count = len(properties)
        if prop_count == 1:
            return list(properties.values())[0]
        elif prop_count == 0:
            return None
        else:
            property_labels = list(properties.keys())
            msg = f"unexpected get_property_by_id result for property id {property_id}. Expected 0 or 1 results bot got:{property_labels}"
            raise ValueError(msg)
        pass

__init__(endpoint_url='https://qlever.cs.uni-freiburg.de/api/wikidata', langs=['en', 'zh', 'hi', 'de', 'fr', 'ar', 'es', 'bn', 'ru'], with_load=True, profile=True, debug=False)

initialize the lookups

Source code in ez_wikidata/wdproperty.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
def __init__(
    self,
    endpoint_url: str = "https://qlever.cs.uni-freiburg.de/api/wikidata",
    langs: List[str] = ["en", "zh", "hi", "de", "fr", "ar", "es", "bn", "ru"],
    with_load: bool = True,
    profile: bool = True,
    debug: bool = False,
):
    """
    initialize the lookups
    """
    if not "en" in langs:
        raise ValueError(f"en is mandatory in langs -{langs}")
    self.langs = langs
    self.debug = debug
    self.profile = profile
    self.sparql = SPARQL(endpoint_url, debug=self.debug)
    self.sql_db_path = WikidataPropertyManager.get_cache_path()
    self.sql_db = SQLDB(self.sql_db_path)
    self.sparql_query = self.get_query_for_langs(langs)
    self.props = []
    self.props_by_id = {}
    self.props_by_lang = {}
    self.loaded=False
    if with_load:
        self.load()

get_instance(endpoint_url='https://qlever.cs.uni-freiburg.de/api/wikidata') classmethod

initialize the wikidata property manager

Parameters:

Name Type Description Default
endpoint_url(str)

the SPARQL endpoint to query if there is no cache available

required
lang(str)

the languages to query propery labels and descriptions for

required
Source code in ez_wikidata/wdproperty.py
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
@classmethod
def get_instance(
    cls,
    endpoint_url: str = "https://qlever.cs.uni-freiburg.de/api/wikidata",
) -> "WikidataPropertyManager":
    """
    initialize the wikidata property manager

    Args:
        endpoint_url(str): the SPARQL endpoint to query if there is no cache available
        lang(str): the languages to query propery labels and descriptions for
    """
    if not hasattr(cls, "wpm"):
        cls.wpm = WikidataPropertyManager(endpoint_url)
    return cls.wpm

get_mappings_for_records(prop_mapping_records)

convert given list of property mapping records to list of PropertyMappings Args: prop_mapping_records: records to convert

Returns:

Type Description
List[PropertyMapping]

property mappings

Source code in ez_wikidata/wdproperty.py
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
def get_mappings_for_records(
    self, prop_mapping_records: Dict[str, dict]
) -> List["PropertyMapping"]:
    """
    convert given list of property mapping records to list of PropertyMappings
    Args:
        prop_mapping_records: records to convert

    Returns:
        property mappings
    """
    mappings = []
    for record in prop_mapping_records.values():
        mapping = PropertyMapping.from_record(self, record)
        mappings.append(mapping)
    return mappings

get_properties_by_ids(ids, lang='en')

Get properties by their IDs for a specific language.

Parameters:

Name Type Description Default
ids List[str]

List of property IDs to search for.

required
lang(str)

the language

required

Returns:

Type Description
Dict[str, Optional[WikidataProperty]]

A dictionary of {property ID: WikidataProperty or None} for found and not found properties.

Source code in ez_wikidata/wdproperty.py
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
def get_properties_by_ids(
    self, ids: List[str],lang:str="en"
) -> Dict[str, Optional[WikidataProperty]]:
    """
    Get properties by their IDs for a specific language.

    Args:
        ids: List of property IDs to search for.
        lang(str): the language

    Returns:
        A dictionary of {property ID: WikidataProperty or None} for found and not found properties.
    """
    matched_properties = {}
    for pid in ids:
        # first check requested language
        if pid in self.props_by_id[lang]:
            matched_properties[pid] = self.props_by_id[lang][pid]
        elif pid in self.props_by_lang["en"]:
            # fall back to english
            matched_properties[pid] = self.props_by_id["en"][pid]
    return matched_properties

get_properties_by_labels(labels, lang='en')

Get properties by their labels for a specific language.

Parameters:

Name Type Description Default
labels List[str]

List of property labels to search for.

required
lang str

the language to match with

'en'

Returns: A dictionary of {label: WikidataProperty} for found properties.

Source code in ez_wikidata/wdproperty.py
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
def get_properties_by_labels(
    self, labels: List[str], lang: str = "en"
) -> Dict[str, WikidataProperty]:
    """
    Get properties by their labels for a specific language.

    Args:
        labels: List of property labels to search for.
        lang: the language to match with
    Returns:
        A dictionary of {label: WikidataProperty} for found properties.
    """
    matched_properties = {}
    # Check if language exists in cached properties
    # Iterate over requested labels and try to find them in the cached properties
    for label in labels:
        if label in self.props_by_lang[lang]:
            matched_properties[label] = self.props_by_lang[lang][label]
    return matched_properties

get_property_by_id(property_id)

lookup a WikidataProperty for the given property_id

Parameters:

Name Type Description Default
property_id(str)

a property ID e.g. "P6375"

required
Source code in ez_wikidata/wdproperty.py
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
def get_property_by_id(self, property_id: str) -> WikidataProperty:
    """
    lookup a WikidataProperty for the given property_id

    Args:
        property_id(str): a property ID e.g. "P6375"
    """
    properties = self.get_properties_by_ids([property_id])
    prop_count = len(properties)
    if prop_count == 1:
        return list(properties.values())[0]
    elif prop_count == 0:
        return None
    else:
        property_labels = list(properties.keys())
        msg = f"unexpected get_property_by_id result for property id {property_id}. Expected 0 or 1 results bot got:{property_labels}"
        raise ValueError(msg)
    pass

get_query_for_langs(langs=None)

Get the SPARQL query for the given list of langs.

Source code in ez_wikidata/wdproperty.py
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
  def get_query_for_langs(self, langs: list = None) -> str:
      """
      Get the SPARQL query for the given list of langs.
      """
      query_prefix = Prefixes.getPrefixes(["wikibase", "rdfs", "schema"])
      query_body = ""
      if langs is None:
          langs = self.langs
      for lang in langs:
          if query_body:  # If not the first iteration, add UNION
              query_body += "\n  UNION"
          query_body += f"""
{{ # wikidata properties with {lang} labels and descriptions
  ?property a wikibase:Property;
  rdfs:label ?propertyLabel;
  schema:description ?propertyDescription;
  wikibase:propertyType ?wbType.
  FILTER(LANG(?propertyLabel) = "{lang}") .
  FILTER(LANG(?propertyDescription) = "{lang}") .
  BIND("{lang}" AS ?lang)
}}"""
      query = (
          query_prefix + "SELECT \n"
          "  (STR(?property) AS ?pid)\n"
          "  ?lang\n"
          "  (?propertyLabel AS ?plabel)\n"
          "  (?propertyDescription AS ?description)\n"
          "  (STR(?wbType) AS ?type_name)\n"
          "WHERE {" + query_body + "\n}\n"
      )
      return query

init_props()

initialize my property structures

Source code in ez_wikidata/wdproperty.py
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
def init_props(self):
    """
    initialize my property structures
    """
    self.props = []
    self.props_by_id = {}
    self.props_by_lang = {}
    if not self.lod:
        raise Exception(f"Could not fetch wikidata properties for {self.langs}")
    for record in self.lod:
        prop = WikidataProperty(**record)
        self.props.append(prop)
    for lang in self.langs:
        self.props_by_lang[lang] = {}
        self.props_by_id[lang]={}
    for prop in self.props:
        self.props_by_lang[prop.lang][prop.plabel] = prop
        self.props_by_id[prop.lang][prop.pid] = prop

load()

load the properties

Source code in ez_wikidata/wdproperty.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
def load(self):
    """
    load the properties
    """
    if self.loaded:
        return
    if os.path.isfile(self.sql_db_path) and os.stat(self.sql_db_path).st_size > 0:
        self.load_from_sql()
    else:
        self.load_from_sparql()
        for record in self.lod:
            pid=record["pid"]
            lang=record["lang"]
            pid = pid.replace("http://www.wikidata.org/entity/", "")
            record["pid"]=pid
            record["id"]=f"{pid}-{lang}"
        self.store()
    self.init_props()
    self.loaded=True

load_from_sparql()

get my list of dicts from sparql

Source code in ez_wikidata/wdproperty.py
208
209
210
211
212
213
214
def load_from_sparql(self):
    """
    get my list of dicts from sparql
    """
    profiler = Profiler(f"getting wikidata properties for {len(self.langs)} languages via SPARQL", profile=self.profile)
    self.lod = self.sparql.queryAsListOfDicts(self.sparql_query)
    profiler.time()

load_from_sql()

load from SQL

Source code in ez_wikidata/wdproperty.py
237
238
239
240
241
242
243
244
245
246
def load_from_sql(self):
    """
    load from SQL
    """
    profiler = Profiler(
        f"loading wikidata properties from SQL", profile=self.profile
    )
    sql_query = "SELECT * FROM wd_properties"
    self.lod = self.sql_db.query(sql_query)
    profiler.time()

store()

store my list of dicts

Source code in ez_wikidata/wdproperty.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
def store(self):
    """
    store my list of dicts
    """
    profiler = Profiler(f"caching wikidata properties to SQL", profile=self.profile)
    self.entity_info = self.sql_db.createTable(
        listOfRecords=self.lod,
        entityName="wd_properties",
        primaryKey="id",
        withCreate=True,
        withDrop=True,
        sampleRecordCount=100,
    )
    self.sql_db.store(
        listOfRecords=self.lod,
        entityInfo=self.entity_info,
        executeMany=True,
        fixNone=True,
    )
    profiler.time()

wdsearch

Created on 2022-07-24

@author: wf

WikidataSearch

Bases: object

Wikidata Search API wrapper

Source code in ez_wikidata/wdsearch.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
class WikidataSearch(object):
    """
    Wikidata Search API wrapper
    """

    def __init__(self, language: str = "en", timeout: float = 2.0):
        """
        Constructor

        Args:
            language(str): the language to use e.g. en/fr
            timeout(float): maximum time to wait for result
        """
        self.language = language
        self.timeout = timeout

    def searchOptions(
        self, searchFor: str, limit: int = 9
    ) -> List[Tuple[str, str, str]]:
        """
        Search and return a list of qid, itemLabel, description tuples.

        Args:
            searchFor (str): the string to search for.
            limit (int): the maximum amount of results to return.

        Returns:
            List[Tuple[str, str, str]]:
            A list of tuples containing
            qid, itemLabel, and description.
        """
        options = []
        srlist = self.search(searchFor, limit)
        if srlist is not None:
            for sr in srlist:
                qid = sr["id"]
                itemLabel = sr["label"]
                desc = ""
                if "display" in sr:
                    display = sr["display"]
                    if "description" in display:
                        desc = display["description"]["value"]
                options.append(
                    (
                        qid,
                        itemLabel,
                        desc,
                    )
                )
        return options

    def search(self, searchFor: str, limit: int = 9):
        """

        Args:
            searchFor(str): the string to search for
            limit(int): the maximum amount of results to search for
        """
        try:
            apiurl = f"https://www.wikidata.org/w/api.php?action=wbsearchentities&language={self.language}&uselang={self.language}&format=json&limit={limit}&search="
            searchEncoded = urllib.parse.quote_plus(searchFor)
            apisearch = apiurl + searchEncoded
            with urllib.request.urlopen(apisearch, timeout=self.timeout) as url:
                searchResult = json.loads(url.read().decode())
            return searchResult["search"]
        except Exception as _error:
            return None

    def getProperties(self):
        """
        get the Wikidata Properties
        """
        scriptdir = os.path.dirname(__file__)
        jsonPath = f"{scriptdir}/resources/wdprops.json"
        with open(jsonPath) as jsonFile:
            props = json.load(jsonFile)
        return props

__init__(language='en', timeout=2.0)

Constructor

Parameters:

Name Type Description Default
language(str)

the language to use e.g. en/fr

required
timeout(float)

maximum time to wait for result

required
Source code in ez_wikidata/wdsearch.py
18
19
20
21
22
23
24
25
26
27
def __init__(self, language: str = "en", timeout: float = 2.0):
    """
    Constructor

    Args:
        language(str): the language to use e.g. en/fr
        timeout(float): maximum time to wait for result
    """
    self.language = language
    self.timeout = timeout

getProperties()

get the Wikidata Properties

Source code in ez_wikidata/wdsearch.py
81
82
83
84
85
86
87
88
89
def getProperties(self):
    """
    get the Wikidata Properties
    """
    scriptdir = os.path.dirname(__file__)
    jsonPath = f"{scriptdir}/resources/wdprops.json"
    with open(jsonPath) as jsonFile:
        props = json.load(jsonFile)
    return props

search(searchFor, limit=9)

Parameters:

Name Type Description Default
searchFor(str)

the string to search for

required
limit(int)

the maximum amount of results to search for

required
Source code in ez_wikidata/wdsearch.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def search(self, searchFor: str, limit: int = 9):
    """

    Args:
        searchFor(str): the string to search for
        limit(int): the maximum amount of results to search for
    """
    try:
        apiurl = f"https://www.wikidata.org/w/api.php?action=wbsearchentities&language={self.language}&uselang={self.language}&format=json&limit={limit}&search="
        searchEncoded = urllib.parse.quote_plus(searchFor)
        apisearch = apiurl + searchEncoded
        with urllib.request.urlopen(apisearch, timeout=self.timeout) as url:
            searchResult = json.loads(url.read().decode())
        return searchResult["search"]
    except Exception as _error:
        return None

searchOptions(searchFor, limit=9)

Search and return a list of qid, itemLabel, description tuples.

Parameters:

Name Type Description Default
searchFor str

the string to search for.

required
limit int

the maximum amount of results to return.

9

Returns:

Type Description
List[Tuple[str, str, str]]

List[Tuple[str, str, str]]:

List[Tuple[str, str, str]]

A list of tuples containing

List[Tuple[str, str, str]]

qid, itemLabel, and description.

Source code in ez_wikidata/wdsearch.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def searchOptions(
    self, searchFor: str, limit: int = 9
) -> List[Tuple[str, str, str]]:
    """
    Search and return a list of qid, itemLabel, description tuples.

    Args:
        searchFor (str): the string to search for.
        limit (int): the maximum amount of results to return.

    Returns:
        List[Tuple[str, str, str]]:
        A list of tuples containing
        qid, itemLabel, and description.
    """
    options = []
    srlist = self.search(searchFor, limit)
    if srlist is not None:
        for sr in srlist:
            qid = sr["id"]
            itemLabel = sr["label"]
            desc = ""
            if "display" in sr:
                display = sr["display"]
                if "description" in display:
                    desc = display["description"]["value"]
            options.append(
                (
                    qid,
                    itemLabel,
                    desc,
                )
            )
    return options

wikidata

Created on 2022-04-18

@author: wf

UrlReference

Bases: Reference

Reference consisting of reference URL (P854) retrieved (P813)

Source code in ez_wikidata/wikidata.py
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
class UrlReference(Reference):
    """
    Reference consisting of
        reference URL (P854)
        retrieved (P813)
    """

    def __init__(
        self, url, date: Union[str, datetime.date, datetime.datetime, None] = None
    ):
        """
        constructor
        Args:
            url: reference URL
            date: retrieved at
        """
        super().__init__()
        self.url = url
        if date is None:
            date = datetime.date.today()
        self.date = date
        self.add(URL(value=self.url, prop_nr="P854"))
        self.add(Wikidata.get_date_claim(date, prop_nr="P813"))

__init__(url, date=None)

constructor Args: url: reference URL date: retrieved at

Source code in ez_wikidata/wikidata.py
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
def __init__(
    self, url, date: Union[str, datetime.date, datetime.datetime, None] = None
):
    """
    constructor
    Args:
        url: reference URL
        date: retrieved at
    """
    super().__init__()
    self.url = url
    if date is None:
        date = datetime.date.today()
    self.date = date
    self.add(URL(value=self.url, prop_nr="P854"))
    self.add(Wikidata.get_date_claim(date, prop_nr="P813"))

Wikidata

wikidata access

see http://learningwikibase.com/data-import/

Source code in ez_wikidata/wikidata.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
class Wikidata:
    """
    wikidata access

    see http://learningwikibase.com/data-import/
    """

    TEST_WD_URL = "https://test.wikidata.org"
    WD_URL = "https://www.wikidata.org"

    def __init__(
        self,
        baseurl: str = None,
        wpm: WikidataPropertyManager = None,
        debug: bool = False,
    ):
        """
        Constructor

        Args:
            baseurl(str): the baseurl of the wikibase to use
            debug(bool): if True output debug information
            wpm(WikidataPropertymanager):
        """
        if baseurl is None:
            baseurl = self.WD_URL
        self.baseurl = baseurl
        self.debug = debug
        self.apiurl = f"{self.baseurl}/w/api.php"
        self.login = None
        self.user = None
        self._wbi = None
        if wpm is None:
            wpm = WikidataPropertyManager.get_instance()
        self.wpm = wpm

    @property
    def wbi(self) -> WikibaseIntegrator:
        """
        WikibaseIntegrator
        """
        if self._wbi is None or (self.login is not None and self._wbi.login is None):
            wbi_config[
                "USER_AGENT"
            ] = f"{Version.name}/{Version.version} (https://www.wikidata.org/wiki/User:{self.user})"
            wbi_config["MEDIAWIKI_API_URL"] = self.apiurl
            self._wbi = WikibaseIntegrator(login=self.login)
        return self._wbi

    @wbi.setter
    def wbi(self, wbi: typing.Union[WikibaseIntegrator, None]):
        """
        set the WikibaseIntegrator
        """
        self._wbi = wbi

    def getCredentials(self) -> (str, str):
        """
        get my credentials https://test.wikidata.org/wiki/Property:P370

        from the wd npm command line tool

        Throws:
            Exception: if no credentials are available for the baseurl

        Returns:
            (username, password) of the account assigned to the baseurl
        """
        user = None
        pwd = None
        home = str(Path.home())
        configFilePath = f"{home}/.config/wikibase-cli/config.json"
        if os.path.isfile(configFilePath):
            with open(configFilePath, mode="r") as f:
                wikibaseConfigJson = json.load(f)
                credentials = wikibaseConfigJson["credentials"]
                credentialRecord = credentials.get(self.baseurl, None)
                if (
                    self.baseurl == self.TEST_WD_URL
                    and self.baseurl not in credentials
                    and self.WD_URL in credentials
                ):
                    credentialRecord = credentials.get(self.WD_URL)
                if credentialRecord is None:
                    raise Exception(f"no credentials available for {self.baseurl}")
                user = credentialRecord["username"]
                pwd = credentialRecord["password"]
        return user, pwd

    def loginWithCredentials(self, user: str = None, pwd: str = None):
        """
        login using the given credentials or credentials
        retrieved via self.getCredentials

        Args:
            user(str): the username
            pwd(str): the password
        """
        if user is None:
            user, pwd = self.getCredentials()

        if user is not None:
            self.login = wbi_login.Login(
                user=user, password=pwd, mediawiki_api_url=self.apiurl
            )
            if self.login:
                self.user = user

    def logout(self):
        """
        log the user out again
        """
        self.user = None
        self.login = None
        self.wbi = None

    def getItemByName(
        self, itemName: str, itemType: str, lang: str = "en"
    ) -> typing.Optional[str]:
        """
        get an item by Name
        ToDo: Needs to be reworked as always WDQS is used as endpoint even if a different one is defined
        Args:
            itemName(str): the item to look for
            itemType(str): the type of the item
            lang(str): the language of the itemName
        """
        itemLabel = f'"{itemName}"@{lang}'
        sparqlQuery = """PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
            PREFIX wdt: <http://www.wikidata.org/prop/direct/>
            PREFIX wd: <http://www.wikidata.org/entity/>

            SELECT ?item ?itemLabel
            WHERE {
              {
                ?item wdt:P31|wdt:P31/wdt:P279 wd:%s.
                ?item rdfs:label ?itemLabel.
                # short name
                BIND(%s as ?shortNameLabel )
                ?item wdt:P1813 ?shortNameLabel
                FILTER(LANG(?itemLabel)= "%s" )
              } UNION {
                ?item wdt:P31|wdt:P31/wdt:P279 wd:%s.
                BIND(%s as ?itemLabel )
                ?item rdfs:label ?itemLabel.
              }
            }""" % (
            itemType,
            itemLabel,
            lang,
            itemType,
            itemLabel,
        )
        endpointUrl = "https://query.wikidata.org/sparql"
        sparql = SPARQL(endpointUrl)
        itemRows = sparql.queryAsListOfDicts(sparqlQuery)
        item = None
        if len(itemRows) > 0:
            item = itemRows[0]["item"].replace("http://www.wikidata.org/entity/", "")
        return item

    def addDict(
        self,
        row: dict,
        mapDict: dict,
        itemId: Union[str, None] = None,
        lang: str = "en",
        write: bool = False,
        ignoreErrors: bool = False,
    ) -> WikidataResult:
        """
        add the given row mapping with the given map Dict

        Args:
            row(dict): the data row to add
            mapDict(dict): the mapping dictionary to use
            itemId: wikidata id of the item the data should be added to. If None a new item is created unless item id is provided in the record
            lang(str): the language for lookups
            write(bool): if True do actually write
            ignoreErrors(bool): if True ignore errors

        Returns:
            WikiDataResult: the result of the operation
        """
        mappings = self.wpm.get_mappings_for_records(mapDict)
        return self.add_record(
            row,
            mappings,
            item_id=itemId,
            lang=lang,
            write=write,
            ignore_errors=ignoreErrors,
        )

    def get_record(
        self,
        item_id: str,
        property_mappings: Union[
            List[str], List["PropertyMapping"], typing.Dict[str, dict]
        ],
        include_label: bool = True,
        include_description: bool = True,
        label_for_qids: bool = False,
    ) -> dict:
        """
        Get the properties form the given item
        Args:
            item_id: id of the item to get the data from
            property_mappings: list of property values to extract
            include_label:
            include_description:
            label_for_qids: If True fetch the label for a linked Qid
        Returns:
            dict with the property values
        """
        item = self.wbi.item.get(item_id)
        lang = "en"
        if isinstance(property_mappings, dict):
            property_mappings = PropertyMapping.from_records(property_mappings)
        record = dict()
        if include_label and item.labels.get(lang) is not None:
            record["label"] = item.labels.get(lang).value
        if include_description and item.descriptions.get(lang) is not None:
            record["description"] = item.descriptions.get(lang).value
        qualifier_lookup = PropertyMapping.get_qualifier_lookup(property_mappings)
        pms = []
        for pm in property_mappings:
            if not isinstance(pm, PropertyMapping) or not pm.is_qualifier():
                pms.append(pm)
        for prop in pms:
            prop_id = prop
            if isinstance(prop, PropertyMapping):
                prop_id = prop.propertyId
            statements = self._get_statements_by_pid(item, prop_id)
            prop_label = prop_id
            if isinstance(prop, PropertyMapping):
                prop_label = prop.column
            values = []
            for statement in statements:
                value = self._get_statement_value(statement)
                if label_for_qids:
                    if (
                        prop.valueLookupType is not None
                        and statement.mainsnak.datatype == "wikibase-item"
                    ):
                        label = self.get_item_label(value, lang)
                        if label is not None:
                            value = label
                values.append(value)
                if (
                    isinstance(prop, PropertyMapping)
                    and prop.column in qualifier_lookup
                ):
                    for qualifier_pm in qualifier_lookup[prop.column]:
                        if qualifier_pm.propertyId in statement.qualifiers.qualifiers:
                            qualifier_statements = statement.qualifiers.get(
                                qualifier_pm.propertyId
                            )
                        else:
                            qualifier_statements = []
                        qualifier_values = []
                        for qualifier_statement in qualifier_statements:
                            qualifier_values.append(
                                self._get_statement_value(qualifier_statement)
                            )
                        record[qualifier_pm.column] = (
                            qualifier_values[0]
                            if len(qualifier_values) == 1
                            else qualifier_values
                        )
            if len(values) == 1:
                record[prop_label] = values[0]
            elif values == []:
                record[prop_label] = None
            else:
                record[prop_label] = values
        return record

    def get_item_label(self, item_id: str, lang: str = None) -> typing.Union[str, None]:
        """
        Get the label for the given item id
        Args:
            item_id: id of the item
            lang: label language to return. Default is "en"

        Returns:
            str: label of the item
            None: If the label can not be determined or the item_id is None or can not be found
        """
        if lang is None:
            lang = "en"
        label = None
        if item_id is not None:
            linked_item = self.wbi.item.get(item_id)
            linked_item_label = linked_item.labels.get(lang)
            if linked_item_label is not None:
                label = linked_item_label.value
        return label

    def _get_statements_by_pid(self, item: ItemEntity, pid: str) -> List[Item]:
        """
        Get the property statements of the item for the given Pid.
        if ranking is established between the statements return only the highest rank
        Args:
            item: item to get the statements from
            pid: property id
        Returns:
            list: list of the property statements
        """
        if pid in item.claims:
            statements = item.claims.get(pid)
        else:
            statements = []
        if len(statements) > 1:
            ordered_stats = {
                k: list(g) for k, g in groupby(statements, lambda x: x.rank)
            }
            rank_by_preference = [
                WikibaseRank.PREFERRED,
                WikibaseRank.NORMAL,
                WikibaseRank.DEPRECATED,
            ]
            for rank in rank_by_preference:
                if rank in ordered_stats:
                    statements = ordered_stats[rank]
                    break
        return statements

    def _get_statement_value(self, statement: Union[Claim, Snak]) -> typing.Any:
        """
        Get the raw value of the statement without the metadata
        Args:
            statement: statement to extract the value from

        Returns:
            raw value of the statement
        """
        value = None
        snak = statement
        if isinstance(statement, Claim):
            snak = statement.mainsnak
        raw_value = snak.datavalue.get("value")
        datatype = snak.datatype
        if datatype == "wikibase-item":
            value = raw_value.get("id", None)
        elif datatype == "monolingualtext":
            value = raw_value.get("text")
        elif datatype == "string":
            value = raw_value
        elif datatype == "url":
            value = raw_value
        elif datatype == "time":
            value = dateutil.parser.parse(raw_value.get("time")[1:])
            precision = raw_value.get("precision")
            if precision == 11:
                value = value.date()
            elif precision == 9:
                value = value.year
        elif datatype == "external-id":
            value = raw_value
        else:
            pass
        return value

    def add_record(
        self,
        record: dict,
        property_mappings: List["PropertyMapping"],
        item_id: Union[str, None] = None,
        lang: str = "en",
        write: bool = False,
        ignore_errors: bool = False,
        summary: str = None,
        reference: Reference = None,
    ) -> WikidataResult:
        """
        add the given row mapping with the given map Dict

        Args:
            record(dict): the data row to add
            property_mappings(list): the mapping dictionary to use
            item_id: wikidata id of the item the data should be added to. If None a new item is created unless item id is provided in the record
            lang(str): the language for lookups
            write(bool): if True do actually write
            ignore_errors(bool): if True ignore errors
            summary: summary of the item edits
            reference: reference to add to all claims

        Returns:
            (qId, errors): the wikidata item create (if any) and a dict of errors
        """
        claims = []
        errors = dict()
        qualifier_lookup = PropertyMapping.get_qualifier_lookup(property_mappings)
        # check if there is a existing Q-Item identifier in the record
        item_mapping = PropertyMapping.get_item_mapping(property_mappings)
        if item_mapping is not None:
            if item_id is None:
                item_id = record.get(item_mapping.column, None)
        # get the relevant properties
        properties = []
        for pm in property_mappings:
            if not pm.is_qualifier() and not pm.is_item_itself():
                properties.append(pm)
            else:
                # breakpoint to debug ignored properties
                pass

        for prop in properties:
            qualifier_mappings = qualifier_lookup.get(prop.column, None)
            prop_claims, claim_errors = self._get_statement_for_property(
                record, prop, qualifier_mappings, reference, lang
            )
            # merge error dicts to one dict
            errors = {**errors, **claim_errors}
            claims.extend(prop_claims)
        label = self.sanitize_label(record.get("label", None))
        description = record.get("description", None)
        # handle get or create case
        item = self.get_or_create_item(item_id)
        item.add_claims(claims)
        if label:
            item.labels.set(language=lang, value=label)
        if description:
            item.descriptions.set(language=lang, value=description)
        if write:
            if len(errors) == 0 or ignore_errors:
                try:
                    item = item.write(summary=summary)
                except Exception as ex:
                    errors["write failed"] = ex
        result = WikidataResult(item=item, errors=errors, debug=self.debug)
        return result

    def _get_statement_for_property(
        self,
        record: dict,
        prop_mapping: "PropertyMapping",
        qualifier_mappings: Union[List["PropertyMapping"], None],
        reference: Reference,
        lang: str,
    ) -> (List[Claim], dict):
        """
        Get the claims that can be derived from the given property mapping and record.
        Generates a statement with its qualifiers and reference from the given record and mapping.
        If the record value of the property is a list multiple claims are generated

        Args:
            record: data record
            prop_mapping: property definition for the claims that should be generated from the given record
            qualifier_mappings: descriptions of the qualifiers of the property
            reference: reference of the statement
            lang: language to use

        Returns:
            list of statements
        """
        claims = []
        value = self.get_prop_value(record, prop_mapping, lang)
        values = value if isinstance(value, list) else [value]
        errors = dict()
        for value in values:
            statement = None
            try:
                statement = self.convert_to_claim(value=value, pm=prop_mapping)
            except Exception as ex:
                errors[prop_mapping.column] = ex
                if self.debug:
                    print(traceback.format_exc())
            if statement is not None:
                # add reference
                if reference is not None:
                    statement.references.add(reference)
                # add qualifier
                if qualifier_mappings is not None:
                    qualifier_errors = self._add_qualifier_to_statement(
                        record, statement, qualifier_mappings, lang
                    )
                    # merge error dicts to one dict
                    errors = {**errors, **qualifier_errors}
            if statement is not None:
                claims.append(statement)
        return claims, errors

    def _add_qualifier_to_statement(
        self,
        record: dict,
        statement: Claim,
        qualifier_mappings: List["PropertyMapping"],
        lang: str,
    ) -> dict:
        """
        add the qualifiers to the given statement
        Args:
            record:
            statement: add qualifiers to this statement
            qualifier_mappings: list of PropertyMappings of the qualifiers

        Returns:
            dict of occurred errors with the qualifier column as key. If no error occurs an empty dict is returned
        """
        errors = dict()
        for qualifier_pm in qualifier_mappings:
            qualifier_value = self.get_prop_value(record, qualifier_pm, lang)
            if qualifier_value is None:
                continue
            else:
                try:
                    qualifier = self.convert_to_claim(qualifier_value, qualifier_pm)
                    statement.qualifiers.add(qualifier)
                except Exception as ex:
                    errors[qualifier_pm.column] = ex
                    if self.debug:
                        print(traceback.format_exc())
        return errors

    def get_or_create_item(self, item_id: typing.Union[str, None]) -> ItemEntity:
        """
        Get or create the requested wikidata item
        Args:
            item_id: item to retrieve if None create a new item
        """
        if item_id is None or isinstance(item_id, str) and item_id.strip() == "":
            item = self.wbi.item.new()
        else:
            item = self.wbi.item.get(item_id)
        return item

    def get_prop_value(
        self, record: dict, pm: "PropertyMapping", lang: str
    ) -> typing.Any:
        """
        Retrieve the property value from the record and prepare the value if necessary
        Args:
            record: record containing the property data
            pm: property mapping
            lang: language to use

        Returns:
            value of the property from the record
        """
        value = record.get(pm.column, None)
        if value is None:
            value = pm.value
        if value and pm.valueLookupType and not self.is_wikidata_item_id(value):
            # find the wikidata item id of value
            value = self.getItemByName(value, pm.valueLookupType, lang)
        if value and isinstance(value, str):
            value = value.strip()
        return value

    def convert_to_claim(
        self, value, pm: "PropertyMapping"
    ) -> Union[BaseDataType, None]:
        """
        Convert the given value to a corresponding wikidata statement
        Args:
            value: value of the statement
            pm: information about the property statement ot generate

        Raises:
            Exception: if property datatype is unknown or not supported

        Returns:
            BaseDataType
        """
        if value is None or value == "":
            return None
        if pm.property_type_enum is None:
            pm.property_type_enum = self.get_wddatatype_of_property(pm.propertyId)
        if pm.property_type_enum is WdDatatype.year:
            yearString = f"+{value}-01-01T00:00:00Z"
            statement = Time(
                yearString, prop_nr=pm.propertyId, precision=WikibaseDatePrecision.YEAR
            )
        elif pm.property_type_enum is WdDatatype.date:
            statement = self.get_date_claim(value, pm.propertyId)
        elif pm.property_type_enum is WdDatatype.extid:
            statement = ExternalID(value=value, prop_nr=pm.propertyId)
        elif pm.property_type_enum is WdDatatype.string:
            statement = String(value=str(value), prop_nr=pm.propertyId)
        elif pm.property_type_enum is WdDatatype.text:
            statement = MonolingualText(text=str(value), prop_nr=pm.propertyId)
        elif pm.property_type_enum is WdDatatype.url:
            statement = URL(value=value, prop_nr=pm.propertyId)
        elif pm.property_type_enum is WdDatatype.itemid:
            statement = Item(value=value, prop_nr=pm.propertyId)
        else:
            raise Exception(
                f"({pm.property_type_enum}) unknown or not supported datatype"
            )
        return statement

    @staticmethod
    def get_date_claim(
        date: Union[str, datetime.date, datetime.datetime], prop_nr: Union[str, int]
    ) -> Claim:
        """
        Get the data statement for the given date and property id
        Args:
            date: date value
            prop_nr: id of the property

        Returns:
            statement of the given property number with the given value
        """
        if isinstance(date, datetime.date):
            date_value = datetime.datetime.combine(date, datetime.time())
        elif isinstance(date, datetime.datetime):
            date_value = date
        elif isinstance(date, str):
            date_value = dateutil.parser.parse(date)
        else:
            raise Exception(f"Value '{date}' can not be parsed to date")
        iso_date = date_value.isoformat()
        date_string = f"+{iso_date}Z"
        statement = Time(
            date_string, prop_nr=prop_nr, precision=WikibaseDatePrecision.DAY
        )
        return statement

    @staticmethod
    def is_wikidata_item_id(value: str) -> bool:
        """
        Returns true if the given value is a wikidata item id
        """
        return bool(re.fullmatch(r"Q[0-9]+", value))

    @staticmethod
    def is_wikidata_property_id(value: str) -> bool:
        """
        Returns true if the given value is a wikidata property id
        """
        return bool(re.fullmatch(r"P[0-9]+", value))

    @staticmethod
    def sanitize_label(label: str, limit: int = None, postfix: str = None) -> str:
        """
        sanitize given label by ensuring it is not too long
        Args:
            label: label to sanitize
            limit: max length of the label

        Returns:
            sanitized label
        """
        if limit is None:
            limit = 250
        if postfix is None:
            postfix = "..."
        if label is not None and len(label) > limit:
            label = label[: limit - len(postfix)] + postfix
        return label

    @classmethod
    def get_datatype_of_property(cls, property_id: Union[str, int]) -> Union[str, None]:
        """
        Get the datatype of the given property
        Args:
            property_id: id of the property e.g. P31 or 31

        Returns:
            datatype of the property of None if no datatype is defined
        """
        if isinstance(property_id, int) or not property_id.startswith("P"):
            property_id = f"P{property_id}"
        query = """
            PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
            PREFIX wdt: <http://www.wikidata.org/prop/direct/>
            PREFIX wd: <http://www.wikidata.org/entity/>
            PREFIX wikibase: <http://wikiba.se/ontology#>

            SELECT Distinct ?o
            WHERE {
              wd:%s wikibase:propertyType ?o.
            }
        """ % (
            property_id
        )
        endpointUrl = "https://query.wikidata.org/sparql"
        sparql = SPARQL(endpointUrl)
        itemRows = sparql.queryAsListOfDicts(query)
        wikibase_prefix = "http://wikiba.se/ontology#"
        types = []
        for record in itemRows:
            types.append(record.get("o")[len(wikibase_prefix) :])
        if len(types) > 1:
            print("Property has more than one type! please check")
        elif len(types) == 0:
            print("Property has no defined datatype! please check")
            return None
        return types[0]

    @classmethod
    def get_wddatatype_of_property(cls, property_id: Union[str, int]) -> "WdDatatype":
        """
        Get the datatype of the given property
        Args:
            property_id: id of the property e.g. P31 or 31

        Returns:
            WdDatatype of the property of None if no datatype is defined
        """
        property_type = cls.get_datatype_of_property(property_id)
        return WdDatatype.get_by_wikibase(property_type)

    def normalize_records(self, record: dict, prop_map: typing.List["PropertyMapping"]):
        """
        Normalize given record by converting Qids to WikidataItem objects (lookup label) and find out Qid if label given
        based on the given prop_map
        """
        itemid_props = [
            p for p in prop_map if p.property_type_enum is WdDatatype.itemid
        ]
        for p in itemid_props:
            if p.column is None or p.column == "":
                continue
            value = record.get(p.column, None)
            if value is None and p.value is not None:
                value = p.value
            if isinstance(value, list):
                wd_item = [self.get_wikidata_item(v, p.valueLookupType) for v in value]
            else:
                wd_item = self.get_wikidata_item(value, p.valueLookupType)
            record[p.column] = wd_item
        return record

    def get_wikidata_item(
        self, qid_or_label: str, item_type_qid: str = None
    ) -> typing.Optional["WikidataItem"]:
        """
        Get WikidataItem for given label or Qid

        Args:
            qid_or_label: label or Qid of a item

        Returns:
            WikidataItem
        """
        item = None
        if qid_or_label is not None:
            if self.is_wikidata_item_id(qid_or_label):
                # lookup label
                qid = qid_or_label
                label = self.get_item_label(qid)
            else:
                # lookup label
                label = qid_or_label
                qid = self.getItemByName(label, item_type_qid)
            if qid is not None:
                item = WikidataItem(qid, label)
        return item

wbi: WikibaseIntegrator property writable

WikibaseIntegrator

__init__(baseurl=None, wpm=None, debug=False)

Constructor

Parameters:

Name Type Description Default
baseurl(str)

the baseurl of the wikibase to use

required
debug(bool)

if True output debug information

required
wpm(WikidataPropertymanager)
required
Source code in ez_wikidata/wikidata.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def __init__(
    self,
    baseurl: str = None,
    wpm: WikidataPropertyManager = None,
    debug: bool = False,
):
    """
    Constructor

    Args:
        baseurl(str): the baseurl of the wikibase to use
        debug(bool): if True output debug information
        wpm(WikidataPropertymanager):
    """
    if baseurl is None:
        baseurl = self.WD_URL
    self.baseurl = baseurl
    self.debug = debug
    self.apiurl = f"{self.baseurl}/w/api.php"
    self.login = None
    self.user = None
    self._wbi = None
    if wpm is None:
        wpm = WikidataPropertyManager.get_instance()
    self.wpm = wpm

addDict(row, mapDict, itemId=None, lang='en', write=False, ignoreErrors=False)

add the given row mapping with the given map Dict

Parameters:

Name Type Description Default
row(dict)

the data row to add

required
mapDict(dict)

the mapping dictionary to use

required
itemId Union[str, None]

wikidata id of the item the data should be added to. If None a new item is created unless item id is provided in the record

None
lang(str)

the language for lookups

required
write(bool)

if True do actually write

required
ignoreErrors(bool)

if True ignore errors

required

Returns:

Name Type Description
WikiDataResult WikidataResult

the result of the operation

Source code in ez_wikidata/wikidata.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
def addDict(
    self,
    row: dict,
    mapDict: dict,
    itemId: Union[str, None] = None,
    lang: str = "en",
    write: bool = False,
    ignoreErrors: bool = False,
) -> WikidataResult:
    """
    add the given row mapping with the given map Dict

    Args:
        row(dict): the data row to add
        mapDict(dict): the mapping dictionary to use
        itemId: wikidata id of the item the data should be added to. If None a new item is created unless item id is provided in the record
        lang(str): the language for lookups
        write(bool): if True do actually write
        ignoreErrors(bool): if True ignore errors

    Returns:
        WikiDataResult: the result of the operation
    """
    mappings = self.wpm.get_mappings_for_records(mapDict)
    return self.add_record(
        row,
        mappings,
        item_id=itemId,
        lang=lang,
        write=write,
        ignore_errors=ignoreErrors,
    )

add_record(record, property_mappings, item_id=None, lang='en', write=False, ignore_errors=False, summary=None, reference=None)

add the given row mapping with the given map Dict

Parameters:

Name Type Description Default
record(dict)

the data row to add

required
property_mappings(list)

the mapping dictionary to use

required
item_id Union[str, None]

wikidata id of the item the data should be added to. If None a new item is created unless item id is provided in the record

None
lang(str)

the language for lookups

required
write(bool)

if True do actually write

required
ignore_errors(bool)

if True ignore errors

required
summary str

summary of the item edits

None
reference Reference

reference to add to all claims

None

Returns:

Type Description
(qId, errors)

the wikidata item create (if any) and a dict of errors

Source code in ez_wikidata/wikidata.py
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
def add_record(
    self,
    record: dict,
    property_mappings: List["PropertyMapping"],
    item_id: Union[str, None] = None,
    lang: str = "en",
    write: bool = False,
    ignore_errors: bool = False,
    summary: str = None,
    reference: Reference = None,
) -> WikidataResult:
    """
    add the given row mapping with the given map Dict

    Args:
        record(dict): the data row to add
        property_mappings(list): the mapping dictionary to use
        item_id: wikidata id of the item the data should be added to. If None a new item is created unless item id is provided in the record
        lang(str): the language for lookups
        write(bool): if True do actually write
        ignore_errors(bool): if True ignore errors
        summary: summary of the item edits
        reference: reference to add to all claims

    Returns:
        (qId, errors): the wikidata item create (if any) and a dict of errors
    """
    claims = []
    errors = dict()
    qualifier_lookup = PropertyMapping.get_qualifier_lookup(property_mappings)
    # check if there is a existing Q-Item identifier in the record
    item_mapping = PropertyMapping.get_item_mapping(property_mappings)
    if item_mapping is not None:
        if item_id is None:
            item_id = record.get(item_mapping.column, None)
    # get the relevant properties
    properties = []
    for pm in property_mappings:
        if not pm.is_qualifier() and not pm.is_item_itself():
            properties.append(pm)
        else:
            # breakpoint to debug ignored properties
            pass

    for prop in properties:
        qualifier_mappings = qualifier_lookup.get(prop.column, None)
        prop_claims, claim_errors = self._get_statement_for_property(
            record, prop, qualifier_mappings, reference, lang
        )
        # merge error dicts to one dict
        errors = {**errors, **claim_errors}
        claims.extend(prop_claims)
    label = self.sanitize_label(record.get("label", None))
    description = record.get("description", None)
    # handle get or create case
    item = self.get_or_create_item(item_id)
    item.add_claims(claims)
    if label:
        item.labels.set(language=lang, value=label)
    if description:
        item.descriptions.set(language=lang, value=description)
    if write:
        if len(errors) == 0 or ignore_errors:
            try:
                item = item.write(summary=summary)
            except Exception as ex:
                errors["write failed"] = ex
    result = WikidataResult(item=item, errors=errors, debug=self.debug)
    return result

convert_to_claim(value, pm)

Convert the given value to a corresponding wikidata statement Args: value: value of the statement pm: information about the property statement ot generate

Raises:

Type Description
Exception

if property datatype is unknown or not supported

Returns:

Type Description
Union[BaseDataType, None]

BaseDataType

Source code in ez_wikidata/wikidata.py
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
def convert_to_claim(
    self, value, pm: "PropertyMapping"
) -> Union[BaseDataType, None]:
    """
    Convert the given value to a corresponding wikidata statement
    Args:
        value: value of the statement
        pm: information about the property statement ot generate

    Raises:
        Exception: if property datatype is unknown or not supported

    Returns:
        BaseDataType
    """
    if value is None or value == "":
        return None
    if pm.property_type_enum is None:
        pm.property_type_enum = self.get_wddatatype_of_property(pm.propertyId)
    if pm.property_type_enum is WdDatatype.year:
        yearString = f"+{value}-01-01T00:00:00Z"
        statement = Time(
            yearString, prop_nr=pm.propertyId, precision=WikibaseDatePrecision.YEAR
        )
    elif pm.property_type_enum is WdDatatype.date:
        statement = self.get_date_claim(value, pm.propertyId)
    elif pm.property_type_enum is WdDatatype.extid:
        statement = ExternalID(value=value, prop_nr=pm.propertyId)
    elif pm.property_type_enum is WdDatatype.string:
        statement = String(value=str(value), prop_nr=pm.propertyId)
    elif pm.property_type_enum is WdDatatype.text:
        statement = MonolingualText(text=str(value), prop_nr=pm.propertyId)
    elif pm.property_type_enum is WdDatatype.url:
        statement = URL(value=value, prop_nr=pm.propertyId)
    elif pm.property_type_enum is WdDatatype.itemid:
        statement = Item(value=value, prop_nr=pm.propertyId)
    else:
        raise Exception(
            f"({pm.property_type_enum}) unknown or not supported datatype"
        )
    return statement

getCredentials()

get my credentials https://test.wikidata.org/wiki/Property:P370

from the wd npm command line tool

Throws

Exception: if no credentials are available for the baseurl

Returns:

Type Description
(str, str)

(username, password) of the account assigned to the baseurl

Source code in ez_wikidata/wikidata.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def getCredentials(self) -> (str, str):
    """
    get my credentials https://test.wikidata.org/wiki/Property:P370

    from the wd npm command line tool

    Throws:
        Exception: if no credentials are available for the baseurl

    Returns:
        (username, password) of the account assigned to the baseurl
    """
    user = None
    pwd = None
    home = str(Path.home())
    configFilePath = f"{home}/.config/wikibase-cli/config.json"
    if os.path.isfile(configFilePath):
        with open(configFilePath, mode="r") as f:
            wikibaseConfigJson = json.load(f)
            credentials = wikibaseConfigJson["credentials"]
            credentialRecord = credentials.get(self.baseurl, None)
            if (
                self.baseurl == self.TEST_WD_URL
                and self.baseurl not in credentials
                and self.WD_URL in credentials
            ):
                credentialRecord = credentials.get(self.WD_URL)
            if credentialRecord is None:
                raise Exception(f"no credentials available for {self.baseurl}")
            user = credentialRecord["username"]
            pwd = credentialRecord["password"]
    return user, pwd

getItemByName(itemName, itemType, lang='en')

get an item by Name ToDo: Needs to be reworked as always WDQS is used as endpoint even if a different one is defined Args: itemName(str): the item to look for itemType(str): the type of the item lang(str): the language of the itemName

Source code in ez_wikidata/wikidata.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
def getItemByName(
    self, itemName: str, itemType: str, lang: str = "en"
) -> typing.Optional[str]:
    """
    get an item by Name
    ToDo: Needs to be reworked as always WDQS is used as endpoint even if a different one is defined
    Args:
        itemName(str): the item to look for
        itemType(str): the type of the item
        lang(str): the language of the itemName
    """
    itemLabel = f'"{itemName}"@{lang}'
    sparqlQuery = """PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
        PREFIX wdt: <http://www.wikidata.org/prop/direct/>
        PREFIX wd: <http://www.wikidata.org/entity/>

        SELECT ?item ?itemLabel
        WHERE {
          {
            ?item wdt:P31|wdt:P31/wdt:P279 wd:%s.
            ?item rdfs:label ?itemLabel.
            # short name
            BIND(%s as ?shortNameLabel )
            ?item wdt:P1813 ?shortNameLabel
            FILTER(LANG(?itemLabel)= "%s" )
          } UNION {
            ?item wdt:P31|wdt:P31/wdt:P279 wd:%s.
            BIND(%s as ?itemLabel )
            ?item rdfs:label ?itemLabel.
          }
        }""" % (
        itemType,
        itemLabel,
        lang,
        itemType,
        itemLabel,
    )
    endpointUrl = "https://query.wikidata.org/sparql"
    sparql = SPARQL(endpointUrl)
    itemRows = sparql.queryAsListOfDicts(sparqlQuery)
    item = None
    if len(itemRows) > 0:
        item = itemRows[0]["item"].replace("http://www.wikidata.org/entity/", "")
    return item

get_datatype_of_property(property_id) classmethod

Get the datatype of the given property Args: property_id: id of the property e.g. P31 or 31

Returns:

Type Description
Union[str, None]

datatype of the property of None if no datatype is defined

Source code in ez_wikidata/wikidata.py
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
@classmethod
def get_datatype_of_property(cls, property_id: Union[str, int]) -> Union[str, None]:
    """
    Get the datatype of the given property
    Args:
        property_id: id of the property e.g. P31 or 31

    Returns:
        datatype of the property of None if no datatype is defined
    """
    if isinstance(property_id, int) or not property_id.startswith("P"):
        property_id = f"P{property_id}"
    query = """
        PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
        PREFIX wdt: <http://www.wikidata.org/prop/direct/>
        PREFIX wd: <http://www.wikidata.org/entity/>
        PREFIX wikibase: <http://wikiba.se/ontology#>

        SELECT Distinct ?o
        WHERE {
          wd:%s wikibase:propertyType ?o.
        }
    """ % (
        property_id
    )
    endpointUrl = "https://query.wikidata.org/sparql"
    sparql = SPARQL(endpointUrl)
    itemRows = sparql.queryAsListOfDicts(query)
    wikibase_prefix = "http://wikiba.se/ontology#"
    types = []
    for record in itemRows:
        types.append(record.get("o")[len(wikibase_prefix) :])
    if len(types) > 1:
        print("Property has more than one type! please check")
    elif len(types) == 0:
        print("Property has no defined datatype! please check")
        return None
    return types[0]

get_date_claim(date, prop_nr) staticmethod

Get the data statement for the given date and property id Args: date: date value prop_nr: id of the property

Returns:

Type Description
Claim

statement of the given property number with the given value

Source code in ez_wikidata/wikidata.py
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
@staticmethod
def get_date_claim(
    date: Union[str, datetime.date, datetime.datetime], prop_nr: Union[str, int]
) -> Claim:
    """
    Get the data statement for the given date and property id
    Args:
        date: date value
        prop_nr: id of the property

    Returns:
        statement of the given property number with the given value
    """
    if isinstance(date, datetime.date):
        date_value = datetime.datetime.combine(date, datetime.time())
    elif isinstance(date, datetime.datetime):
        date_value = date
    elif isinstance(date, str):
        date_value = dateutil.parser.parse(date)
    else:
        raise Exception(f"Value '{date}' can not be parsed to date")
    iso_date = date_value.isoformat()
    date_string = f"+{iso_date}Z"
    statement = Time(
        date_string, prop_nr=prop_nr, precision=WikibaseDatePrecision.DAY
    )
    return statement

get_item_label(item_id, lang=None)

Get the label for the given item id Args: item_id: id of the item lang: label language to return. Default is "en"

Returns:

Name Type Description
str Union[str, None]

label of the item

None Union[str, None]

If the label can not be determined or the item_id is None or can not be found

Source code in ez_wikidata/wikidata.py
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
def get_item_label(self, item_id: str, lang: str = None) -> typing.Union[str, None]:
    """
    Get the label for the given item id
    Args:
        item_id: id of the item
        lang: label language to return. Default is "en"

    Returns:
        str: label of the item
        None: If the label can not be determined or the item_id is None or can not be found
    """
    if lang is None:
        lang = "en"
    label = None
    if item_id is not None:
        linked_item = self.wbi.item.get(item_id)
        linked_item_label = linked_item.labels.get(lang)
        if linked_item_label is not None:
            label = linked_item_label.value
    return label

get_or_create_item(item_id)

Get or create the requested wikidata item Args: item_id: item to retrieve if None create a new item

Source code in ez_wikidata/wikidata.py
592
593
594
595
596
597
598
599
600
601
602
def get_or_create_item(self, item_id: typing.Union[str, None]) -> ItemEntity:
    """
    Get or create the requested wikidata item
    Args:
        item_id: item to retrieve if None create a new item
    """
    if item_id is None or isinstance(item_id, str) and item_id.strip() == "":
        item = self.wbi.item.new()
    else:
        item = self.wbi.item.get(item_id)
    return item

get_prop_value(record, pm, lang)

Retrieve the property value from the record and prepare the value if necessary Args: record: record containing the property data pm: property mapping lang: language to use

Returns:

Type Description
Any

value of the property from the record

Source code in ez_wikidata/wikidata.py
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
def get_prop_value(
    self, record: dict, pm: "PropertyMapping", lang: str
) -> typing.Any:
    """
    Retrieve the property value from the record and prepare the value if necessary
    Args:
        record: record containing the property data
        pm: property mapping
        lang: language to use

    Returns:
        value of the property from the record
    """
    value = record.get(pm.column, None)
    if value is None:
        value = pm.value
    if value and pm.valueLookupType and not self.is_wikidata_item_id(value):
        # find the wikidata item id of value
        value = self.getItemByName(value, pm.valueLookupType, lang)
    if value and isinstance(value, str):
        value = value.strip()
    return value

get_record(item_id, property_mappings, include_label=True, include_description=True, label_for_qids=False)

Get the properties form the given item Args: item_id: id of the item to get the data from property_mappings: list of property values to extract include_label: include_description: label_for_qids: If True fetch the label for a linked Qid Returns: dict with the property values

Source code in ez_wikidata/wikidata.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
def get_record(
    self,
    item_id: str,
    property_mappings: Union[
        List[str], List["PropertyMapping"], typing.Dict[str, dict]
    ],
    include_label: bool = True,
    include_description: bool = True,
    label_for_qids: bool = False,
) -> dict:
    """
    Get the properties form the given item
    Args:
        item_id: id of the item to get the data from
        property_mappings: list of property values to extract
        include_label:
        include_description:
        label_for_qids: If True fetch the label for a linked Qid
    Returns:
        dict with the property values
    """
    item = self.wbi.item.get(item_id)
    lang = "en"
    if isinstance(property_mappings, dict):
        property_mappings = PropertyMapping.from_records(property_mappings)
    record = dict()
    if include_label and item.labels.get(lang) is not None:
        record["label"] = item.labels.get(lang).value
    if include_description and item.descriptions.get(lang) is not None:
        record["description"] = item.descriptions.get(lang).value
    qualifier_lookup = PropertyMapping.get_qualifier_lookup(property_mappings)
    pms = []
    for pm in property_mappings:
        if not isinstance(pm, PropertyMapping) or not pm.is_qualifier():
            pms.append(pm)
    for prop in pms:
        prop_id = prop
        if isinstance(prop, PropertyMapping):
            prop_id = prop.propertyId
        statements = self._get_statements_by_pid(item, prop_id)
        prop_label = prop_id
        if isinstance(prop, PropertyMapping):
            prop_label = prop.column
        values = []
        for statement in statements:
            value = self._get_statement_value(statement)
            if label_for_qids:
                if (
                    prop.valueLookupType is not None
                    and statement.mainsnak.datatype == "wikibase-item"
                ):
                    label = self.get_item_label(value, lang)
                    if label is not None:
                        value = label
            values.append(value)
            if (
                isinstance(prop, PropertyMapping)
                and prop.column in qualifier_lookup
            ):
                for qualifier_pm in qualifier_lookup[prop.column]:
                    if qualifier_pm.propertyId in statement.qualifiers.qualifiers:
                        qualifier_statements = statement.qualifiers.get(
                            qualifier_pm.propertyId
                        )
                    else:
                        qualifier_statements = []
                    qualifier_values = []
                    for qualifier_statement in qualifier_statements:
                        qualifier_values.append(
                            self._get_statement_value(qualifier_statement)
                        )
                    record[qualifier_pm.column] = (
                        qualifier_values[0]
                        if len(qualifier_values) == 1
                        else qualifier_values
                    )
        if len(values) == 1:
            record[prop_label] = values[0]
        elif values == []:
            record[prop_label] = None
        else:
            record[prop_label] = values
    return record

get_wddatatype_of_property(property_id) classmethod

Get the datatype of the given property Args: property_id: id of the property e.g. P31 or 31

Returns:

Type Description
WdDatatype

WdDatatype of the property of None if no datatype is defined

Source code in ez_wikidata/wikidata.py
769
770
771
772
773
774
775
776
777
778
779
780
@classmethod
def get_wddatatype_of_property(cls, property_id: Union[str, int]) -> "WdDatatype":
    """
    Get the datatype of the given property
    Args:
        property_id: id of the property e.g. P31 or 31

    Returns:
        WdDatatype of the property of None if no datatype is defined
    """
    property_type = cls.get_datatype_of_property(property_id)
    return WdDatatype.get_by_wikibase(property_type)

get_wikidata_item(qid_or_label, item_type_qid=None)

Get WikidataItem for given label or Qid

Parameters:

Name Type Description Default
qid_or_label str

label or Qid of a item

required

Returns:

Type Description
Optional[WikidataItem]

WikidataItem

Source code in ez_wikidata/wikidata.py
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
def get_wikidata_item(
    self, qid_or_label: str, item_type_qid: str = None
) -> typing.Optional["WikidataItem"]:
    """
    Get WikidataItem for given label or Qid

    Args:
        qid_or_label: label or Qid of a item

    Returns:
        WikidataItem
    """
    item = None
    if qid_or_label is not None:
        if self.is_wikidata_item_id(qid_or_label):
            # lookup label
            qid = qid_or_label
            label = self.get_item_label(qid)
        else:
            # lookup label
            label = qid_or_label
            qid = self.getItemByName(label, item_type_qid)
        if qid is not None:
            item = WikidataItem(qid, label)
    return item

is_wikidata_item_id(value) staticmethod

Returns true if the given value is a wikidata item id

Source code in ez_wikidata/wikidata.py
697
698
699
700
701
702
@staticmethod
def is_wikidata_item_id(value: str) -> bool:
    """
    Returns true if the given value is a wikidata item id
    """
    return bool(re.fullmatch(r"Q[0-9]+", value))

is_wikidata_property_id(value) staticmethod

Returns true if the given value is a wikidata property id

Source code in ez_wikidata/wikidata.py
704
705
706
707
708
709
@staticmethod
def is_wikidata_property_id(value: str) -> bool:
    """
    Returns true if the given value is a wikidata property id
    """
    return bool(re.fullmatch(r"P[0-9]+", value))

loginWithCredentials(user=None, pwd=None)

login using the given credentials or credentials retrieved via self.getCredentials

Parameters:

Name Type Description Default
user(str)

the username

required
pwd(str)

the password

required
Source code in ez_wikidata/wikidata.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
def loginWithCredentials(self, user: str = None, pwd: str = None):
    """
    login using the given credentials or credentials
    retrieved via self.getCredentials

    Args:
        user(str): the username
        pwd(str): the password
    """
    if user is None:
        user, pwd = self.getCredentials()

    if user is not None:
        self.login = wbi_login.Login(
            user=user, password=pwd, mediawiki_api_url=self.apiurl
        )
        if self.login:
            self.user = user

logout()

log the user out again

Source code in ez_wikidata/wikidata.py
184
185
186
187
188
189
190
def logout(self):
    """
    log the user out again
    """
    self.user = None
    self.login = None
    self.wbi = None

normalize_records(record, prop_map)

Normalize given record by converting Qids to WikidataItem objects (lookup label) and find out Qid if label given based on the given prop_map

Source code in ez_wikidata/wikidata.py
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
def normalize_records(self, record: dict, prop_map: typing.List["PropertyMapping"]):
    """
    Normalize given record by converting Qids to WikidataItem objects (lookup label) and find out Qid if label given
    based on the given prop_map
    """
    itemid_props = [
        p for p in prop_map if p.property_type_enum is WdDatatype.itemid
    ]
    for p in itemid_props:
        if p.column is None or p.column == "":
            continue
        value = record.get(p.column, None)
        if value is None and p.value is not None:
            value = p.value
        if isinstance(value, list):
            wd_item = [self.get_wikidata_item(v, p.valueLookupType) for v in value]
        else:
            wd_item = self.get_wikidata_item(value, p.valueLookupType)
        record[p.column] = wd_item
    return record

sanitize_label(label, limit=None, postfix=None) staticmethod

sanitize given label by ensuring it is not too long Args: label: label to sanitize limit: max length of the label

Returns:

Type Description
str

sanitized label

Source code in ez_wikidata/wikidata.py
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
@staticmethod
def sanitize_label(label: str, limit: int = None, postfix: str = None) -> str:
    """
    sanitize given label by ensuring it is not too long
    Args:
        label: label to sanitize
        limit: max length of the label

    Returns:
        sanitized label
    """
    if limit is None:
        limit = 250
    if postfix is None:
        postfix = "..."
    if label is not None and len(label) > limit:
        label = label[: limit - len(postfix)] + postfix
    return label

WikidataItem dataclass

Source code in ez_wikidata/wikidata.py
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
@dataclass
class WikidataItem:
    qid: str
    label: str
    lang: str = "en"
    sparql: Optional[SPARQL] = None
    debug: bool = False
    label: str = field(init=False, default=None)
    description: str = field(init=False, default=None)
    url: str = field(init=False)

    def __eq__(self, other) -> bool:
        """
        WikidataItems are equal if the qid is equal
        """
        same = isinstance(other, WikidataItem) and self.qid == getattr(
            other, "qid", None
        )
        return same

    def __post_init__(self):
        """
        handle the construction
        """
        if not self.qid:
            self.qid = None
            return
        self.url = f"https://www.wikidata.org/wiki/{self.qid}"
        # numeric qid
        self.qnumber = int(self.qid[1:])
        self.url = f"https://www.wikidata.org/wiki/{self.qid}"
        if self.sparql is not None:
            self.qlabel, self.description = WikidataItem.getLabelAndDescription(
                self.sparql, self.qid, self.lang, debug=self.debug
            )
            self.varname = Variable.validVarName(self.qlabel)
            self.itemVarname = f"{self.varname}Item"
            self.labelVarname = f"{self.varname}"

    def __str__(self):
        return self.asText(long=False)

    def asText(self, long: bool = True, wrapAt: int = 0):
        """
        returns my content as a text representation

        Args:
            long(bool): True if a long format including url is wished
            wrapAt(int): wrap long lines at the given width (if >0)

        Returns:
            str: a text representation of my content
        """
        text = self.qid or "❓"
        if hasattr(self, "qlabel"):
            text = f"{self.qlabel} ({self.qid})"
        if hasattr(self, "description"):
            desc = self.description
            if wrapAt > 0:
                desc = textwrap.fill(desc, width=wrapAt)
            text += f"☞{desc}"
        if long and hasattr(self, "url"):
            text += f"→ {self.url}"
        return text

    @classmethod
    def getLabelAndDescription(
        cls, sparql: SPARQL, itemId: str, lang: str = "en", debug: bool = False
    ):
        """
        get  the label for the given item and language

        Args:
            itemId(str): the wikidata Q/P id
            lang(str): the language of the label
            debug(bool): if True output debug information

        Returns:
            (str,str): the label and description as a tuple
        """
        query = f"""# get the label for the given item
{Prefixes.getPrefixes(["rdfs","wd","schema"])}        
SELECT ?itemLabel ?itemDescription
WHERE
{{
  VALUES ?item {{
    wd:{itemId}
  }}
  ?item rdfs:label ?itemLabel.
  FILTER (LANG(?itemLabel) = "{lang}").
  ?item schema:description ?itemDescription.
  FILTER(LANG(?itemDescription) = "{lang}")
}}"""
        try:
            if debug:
                msg = f"getLabelAndDescription for wikidata Item {itemId} with query:\n{query}"
                print(msg)
            labelAndDescription = sparql.getValues(
                query, ["itemLabel", "itemDescription"]
            )
        except Exception as ex:
            msg = f"getLabelAndDescription failed for wikidata Item {itemId}:{str(ex)}"
            if debug:
                print(msg)
            raise Exception(msg)
        return labelAndDescription

    @classmethod
    def getItemsByLabel(
        cls, sparql: SPARQL, itemLabel: str, lang: str = "en", debug: bool = False
    ) -> list:
        """
        get a Wikidata items by the given label

        Args:
            sparql(SPARQL): the SPARQL endpoint to use
            itemLabel(str): the label of the items
            lang(str): the language of the label
            debug(bool): if True show debugging information

        Returns:
            a list of potential items
        """
        valuesClause = f'   "{itemLabel}"@{lang}\n'
        query = f"""# get the items that have the given label in the given language
# e.g. we'll find human=Q5 as the oldest type for the label "human" first
# and then the newer ones such as "race in Warcraft"
{Prefixes.getPrefixes(["rdfs","schema","xsd"])}
SELECT 
  #?itemId 
  ?item 
  ?itemLabel 
  ?itemDescription
WHERE {{ 
  VALUES ?itemLabel {{
    {valuesClause}
  }}
  #BIND (xsd:integer(SUBSTR(STR(?item),33)) AS ?itemId)
  ?item rdfs:label ?itemLabel. 
  ?item schema:description ?itemDescription.
  FILTER(LANG(?itemDescription)="{lang}")
}} 
#ORDER BY ?itemId"""
        qLod = sparql.queryAsListOfDicts(query)
        items = []
        for record in qLod:
            url = record["item"]
            qid = re.sub(r"http://www.wikidata.org/entity/(.*)", r"\1", url)
            item = WikidataItem(qid, debug=debug)
            item.url = url
            item.qlabel = record["itemLabel"]
            item.varname = Variable.validVarName(item.qlabel)
            item.description = record["itemDescription"]
            items.append(item)
        sortedItems = sorted(items, key=lambda item: item.qnumber)
        return sortedItems

__eq__(other)

WikidataItems are equal if the qid is equal

Source code in ez_wikidata/wikidata.py
841
842
843
844
845
846
847
848
def __eq__(self, other) -> bool:
    """
    WikidataItems are equal if the qid is equal
    """
    same = isinstance(other, WikidataItem) and self.qid == getattr(
        other, "qid", None
    )
    return same

__post_init__()

handle the construction

Source code in ez_wikidata/wikidata.py
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
def __post_init__(self):
    """
    handle the construction
    """
    if not self.qid:
        self.qid = None
        return
    self.url = f"https://www.wikidata.org/wiki/{self.qid}"
    # numeric qid
    self.qnumber = int(self.qid[1:])
    self.url = f"https://www.wikidata.org/wiki/{self.qid}"
    if self.sparql is not None:
        self.qlabel, self.description = WikidataItem.getLabelAndDescription(
            self.sparql, self.qid, self.lang, debug=self.debug
        )
        self.varname = Variable.validVarName(self.qlabel)
        self.itemVarname = f"{self.varname}Item"
        self.labelVarname = f"{self.varname}"

asText(long=True, wrapAt=0)

returns my content as a text representation

Parameters:

Name Type Description Default
long(bool)

True if a long format including url is wished

required
wrapAt(int)

wrap long lines at the given width (if >0)

required

Returns:

Name Type Description
str

a text representation of my content

Source code in ez_wikidata/wikidata.py
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
def asText(self, long: bool = True, wrapAt: int = 0):
    """
    returns my content as a text representation

    Args:
        long(bool): True if a long format including url is wished
        wrapAt(int): wrap long lines at the given width (if >0)

    Returns:
        str: a text representation of my content
    """
    text = self.qid or "❓"
    if hasattr(self, "qlabel"):
        text = f"{self.qlabel} ({self.qid})"
    if hasattr(self, "description"):
        desc = self.description
        if wrapAt > 0:
            desc = textwrap.fill(desc, width=wrapAt)
        text += f"☞{desc}"
    if long and hasattr(self, "url"):
        text += f"→ {self.url}"
    return text

getItemsByLabel(sparql, itemLabel, lang='en', debug=False) classmethod

get a Wikidata items by the given label

Parameters:

Name Type Description Default
sparql(SPARQL)

the SPARQL endpoint to use

required
itemLabel(str)

the label of the items

required
lang(str)

the language of the label

required
debug(bool)

if True show debugging information

required

Returns:

Type Description
list

a list of potential items

Source code in ez_wikidata/wikidata.py
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
    @classmethod
    def getItemsByLabel(
        cls, sparql: SPARQL, itemLabel: str, lang: str = "en", debug: bool = False
    ) -> list:
        """
        get a Wikidata items by the given label

        Args:
            sparql(SPARQL): the SPARQL endpoint to use
            itemLabel(str): the label of the items
            lang(str): the language of the label
            debug(bool): if True show debugging information

        Returns:
            a list of potential items
        """
        valuesClause = f'   "{itemLabel}"@{lang}\n'
        query = f"""# get the items that have the given label in the given language
# e.g. we'll find human=Q5 as the oldest type for the label "human" first
# and then the newer ones such as "race in Warcraft"
{Prefixes.getPrefixes(["rdfs","schema","xsd"])}
SELECT 
  #?itemId 
  ?item 
  ?itemLabel 
  ?itemDescription
WHERE {{ 
  VALUES ?itemLabel {{
    {valuesClause}
  }}
  #BIND (xsd:integer(SUBSTR(STR(?item),33)) AS ?itemId)
  ?item rdfs:label ?itemLabel. 
  ?item schema:description ?itemDescription.
  FILTER(LANG(?itemDescription)="{lang}")
}} 
#ORDER BY ?itemId"""
        qLod = sparql.queryAsListOfDicts(query)
        items = []
        for record in qLod:
            url = record["item"]
            qid = re.sub(r"http://www.wikidata.org/entity/(.*)", r"\1", url)
            item = WikidataItem(qid, debug=debug)
            item.url = url
            item.qlabel = record["itemLabel"]
            item.varname = Variable.validVarName(item.qlabel)
            item.description = record["itemDescription"]
            items.append(item)
        sortedItems = sorted(items, key=lambda item: item.qnumber)
        return sortedItems

getLabelAndDescription(sparql, itemId, lang='en', debug=False) classmethod

get the label for the given item and language

Parameters:

Name Type Description Default
itemId(str)

the wikidata Q/P id

required
lang(str)

the language of the label

required
debug(bool)

if True output debug information

required

Returns:

Type Description
(str, str)

the label and description as a tuple

Source code in ez_wikidata/wikidata.py
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
    @classmethod
    def getLabelAndDescription(
        cls, sparql: SPARQL, itemId: str, lang: str = "en", debug: bool = False
    ):
        """
        get  the label for the given item and language

        Args:
            itemId(str): the wikidata Q/P id
            lang(str): the language of the label
            debug(bool): if True output debug information

        Returns:
            (str,str): the label and description as a tuple
        """
        query = f"""# get the label for the given item
{Prefixes.getPrefixes(["rdfs","wd","schema"])}        
SELECT ?itemLabel ?itemDescription
WHERE
{{
  VALUES ?item {{
    wd:{itemId}
  }}
  ?item rdfs:label ?itemLabel.
  FILTER (LANG(?itemLabel) = "{lang}").
  ?item schema:description ?itemDescription.
  FILTER(LANG(?itemDescription) = "{lang}")
}}"""
        try:
            if debug:
                msg = f"getLabelAndDescription for wikidata Item {itemId} with query:\n{query}"
                print(msg)
            labelAndDescription = sparql.getValues(
                query, ["itemLabel", "itemDescription"]
            )
        except Exception as ex:
            msg = f"getLabelAndDescription failed for wikidata Item {itemId}:{str(ex)}"
            if debug:
                print(msg)
            raise Exception(msg)
        return labelAndDescription

WikidataResult dataclass

a class for handling a wikidata result

Source code in ez_wikidata/wikidata.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
@dataclass
class WikidataResult:
    """
    a class for handling a wikidata result
    """

    item: Optional[ItemEntity] = None
    errors: Dict[str, Exception] = field(default_factory=dict)
    qid: Optional[str] = None
    msg: Optional[str] = None
    debug: Optional[bool] = False

    def __post_init__(self):
        # If qid is not provided, derive it from item
        if self.qid is None and self.item:
            self.qid = self.item.id

    @property
    def pretty_item_json(self, indent: int = 2) -> str:
        """Returns a pretty-printed JSON string of the item."""
        if self.item:
            item_dict = (
                self.item.get_json()
            )  # Assuming get_json() returns a JSON string representation of the item
            pretty_json = json.dumps(item_dict, indent=indent)
        else:
            pretty_json = self.qid
        return pretty_json

pretty_item_json: str property

Returns a pretty-printed JSON string of the item.