Skip to content

pylodentitymanager API Documentation

cache

Created on 2024-03-09.

@author: wf

refactored from https://github.com/WolfgangFahl/pyCEURmake/blob/main/ceurws/utils/json_cache.py by Tim Holzheim

Cache

Represents cache metadata and its file extension.

Attributes:

Name Type Description
name str

The name of the cache.

extension str

The file extension for the cache (e.g., 'json', 'csv').

size int

The size of the cache file in bytes.

count Optional[int]

Optional; the number of items in the cache, if applicable.

count_attr str

the name of the attribute to determine the number of items, if applicable

last_accessed datetime

Optional; the last accessed timestamp of the cache.

Source code in lodentity/cache.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@lod_storable
class Cache:
    """Represents cache metadata and its file extension.

    Attributes:
        name: The name of the cache.
        extension: The file extension for the cache (e.g., 'json', 'csv').
        size: The size of the cache file in bytes.
        count: Optional; the number of items in the cache, if applicable.
        count_attr: the name of the attribute to determine the number of items, if applicable
        last_accessed: Optional; the last accessed timestamp of the cache.
    """

    name: str
    extension: str
    count_attr: str = None
    count: Optional[int] = None

    def set_path(self, base_path: str):
        """Set my path based on the given base_path and ensure the parent
        directory is created.

        Args:
            base_path (str): The base path where the directory should be created.
        """
        self.path = Path(f"{base_path}/{self.name}{self.extension}")
        # Ensure parent directory is created
        self.path.parent.mkdir(parents=True, exist_ok=True)

    @property
    def is_stored(self) -> bool:
        """Determines if the cache file exists and is not empty."""
        return self.path.is_file() and self.path.stat().st_size > 1

    @property
    def size(self) -> int:
        cache_size = os.path.getsize(self.path) if os.path.isfile(self.path) else 0
        return cache_size

    @property
    def last_accessed(self) -> datetime:
        cache_last_accessed = (
            datetime.fromtimestamp(os.path.getmtime(self.path))
            if os.path.isfile(self.path)
            else None
        )
        return cache_last_accessed

is_stored property

Determines if the cache file exists and is not empty.

set_path(base_path)

Set my path based on the given base_path and ensure the parent directory is created.

Parameters:

Name Type Description Default
base_path str

The base path where the directory should be created.

required
Source code in lodentity/cache.py
37
38
39
40
41
42
43
44
45
46
def set_path(self, base_path: str):
    """Set my path based on the given base_path and ensure the parent
    directory is created.

    Args:
        base_path (str): The base path where the directory should be created.
    """
    self.path = Path(f"{base_path}/{self.name}{self.extension}")
    # Ensure parent directory is created
    self.path.parent.mkdir(parents=True, exist_ok=True)

CacheManager

Manages multiple cache files with various extensions.

Attributes:

Name Type Description
name str

The name used for the base directory where cache files are stored.

caches Dict[str, Cache]

A dictionary to track each cache's metadata.

Source code in lodentity/cache.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
@lod_storable
class CacheManager:
    """Manages multiple cache files with various extensions.

    Attributes:
        name: The name used for the base directory where cache files are stored.
        caches: A dictionary to track each cache's metadata.
    """

    name: str
    caches: Dict[str, Cache] = field(default_factory=dict)

    def __post_init__(self):
        self.base_dir = None

    def base_path(self) -> str:
        """Fetches the base path for this cache manager.

        Args:
            cache: The cache for which to generate the file path.

        Returns:
            The base path
        """
        if self.base_dir is None:
            self.base_dir = os.path.expanduser("~")
        base_path = os.path.join(self.base_dir, f".{self.name}")
        os.makedirs(base_path, exist_ok=True)
        return base_path

    def get_cache_by_name(self, lod_name, ext=".json") -> Cache:
        """Retrieves or creates a cache object by name and extension.

        Args:
            cache_name (str): The name of the cache to retrieve or create.
            ext (str): The file extension for the cache.

        Returns:
            Cache: An existing or newly created Cache object.
        """
        if lod_name in self.caches:
            cache = self.caches[lod_name + ext]
        else:
            cache = Cache(lod_name, ext)
            self.caches[lod_name + ext] = cache
        base_path = self.base_path()
        cache.set_path(base_path)
        return cache

    def load(
        self,
        lod_name: str,
        ext: str = ".json",
        cls: Optional[Type[YamlAble]] = None,
        count_attr: str = None,
    ) -> Union[List, Dict, None]:
        """Load data from a cache file. This method supports JSON and, if a
        relevant class is provided, other formats like YAML.

        Args:
            lod_name (str): The name of the list of dicts or class instances to read from cache.
            ext (str): The extension of the cache file, indicating the format (default is ".json").
            cls (Optional[Type[YamlAble]]): The class type for deserialization. This class must have from_json() or from_yaml()
                                             class methods for deserialization, depending on the file extension.
            count_attr(str): the name of attribute data_to_store for updating the cache.count s
        Returns:
            Union[List, Dict, None]: A list of dicts, a list of class instances, a single dict, or None if the cache is not stored.
        """
        cache = self.get_cache_by_name(lod_name, ext)
        cache.count_attr = count_attr
        result = None
        if cache.is_stored:
            if ext == ".json":
                if cls and hasattr(cls, "load_from_yaml_file"):
                    result = cls.load_from_json_file(
                        cache.path
                    )  # Adjusted for class method
                else:
                    with open(cache.path, encoding="utf-8") as json_file:
                        result = orjson.loads(json_file.read())
            elif ext == ".yaml":
                if cls and hasattr(cls, "load_from_yaml_file"):
                    result = cls.load_from_yaml_file(
                        cache.path
                    )  # Adjusted for class method
                else:
                    raise ValueError(
                        "YAML deserialization requires a cls parameter that is a subclass of YamlAble."
                    )
            else:
                raise ValueError(f"Unsupported file extension {ext} for loading.")

            # Dynamic count update based on count_attr if applicable
            if count_attr and hasattr(result, count_attr):
                cache.count = len(getattr(result, count_attr))
            elif isinstance(result, list):
                cache.count = len(result)

        return result

    def store(
        self,
        cache_name: str,
        data_to_store: Union[List, Dict],
        ext: str = ".json",
        count_attr: str = None,
    ) -> Cache:
        """Stores data into a cache file, handling serialization based on the
        specified file extension. Supports JSON and YAML formats, and custom
        serialization for classes that provide specific serialization methods.

        Args:
            cache_name (str): The identifier for the cache where the data will be stored.
            data_to_store (Union[List, Dict]): The data to be stored in the cache. This can be a list of dictionaries,
                                               a single dictionary, or instances of data classes if `cls` is provided.
            ext (str): The file extension indicating the serialization format (e.g., '.json', '.yaml').
                       Defaults to '.json'.
            count_attr(str): the name of attribute data_to_store for updating the cache.count s

        Raises:
            ValueError: If the file extension is unsupported or if required methods for serialization are not implemented in `cls`.
        """
        cache = self.get_cache_by_name(cache_name, ext)
        cache.count_attr = count_attr
        cache.set_path(self.base_path())

        if ext == ".json":
            # Check if  cls has a method `save_to_json_file`
            # that accepts a file path and data to store
            if isinstance(data_to_store, list):
                json_str = orjson.dumps(data_to_store, option=orjson.OPT_INDENT_2)
                with cache.path.open("wb") as json_file:
                    json_file.write(json_str)
            else:
                if hasattr(data_to_store, "save_to_json_file"):
                    data_to_store.save_to_json_file(str(cache.path))
                else:
                    raise ValueError(
                        "JSON serialization requires a 'save_to_json_file' method"
                    )
        elif ext == ".yaml":
            if hasattr(data_to_store, "save_to_yaml_file"):
                # Assuming cls has a method `save_to_yaml_file` that accepts a file path and data to store
                data_to_store.save_to_yaml_file(str(cache.path))
            else:
                raise ValueError(
                    "YAML serialization requires a 'save_to_yaml_file' method."
                )
        else:
            raise ValueError(f"Unsupported file extension {ext}.")

        # Update cache metadata post storing
        if count_attr and hasattr(data_to_store, count_attr):
            cache.count = len(getattr(data_to_store, count_attr))
        elif isinstance(data_to_store, list):
            cache.count = len(data_to_store)

        return cache

base_path()

Fetches the base path for this cache manager.

Parameters:

Name Type Description Default
cache

The cache for which to generate the file path.

required

Returns:

Type Description
str

The base path

Source code in lodentity/cache.py
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def base_path(self) -> str:
    """Fetches the base path for this cache manager.

    Args:
        cache: The cache for which to generate the file path.

    Returns:
        The base path
    """
    if self.base_dir is None:
        self.base_dir = os.path.expanduser("~")
    base_path = os.path.join(self.base_dir, f".{self.name}")
    os.makedirs(base_path, exist_ok=True)
    return base_path

get_cache_by_name(lod_name, ext='.json')

Retrieves or creates a cache object by name and extension.

Parameters:

Name Type Description Default
cache_name str

The name of the cache to retrieve or create.

required
ext str

The file extension for the cache.

'.json'

Returns:

Name Type Description
Cache Cache

An existing or newly created Cache object.

Source code in lodentity/cache.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def get_cache_by_name(self, lod_name, ext=".json") -> Cache:
    """Retrieves or creates a cache object by name and extension.

    Args:
        cache_name (str): The name of the cache to retrieve or create.
        ext (str): The file extension for the cache.

    Returns:
        Cache: An existing or newly created Cache object.
    """
    if lod_name in self.caches:
        cache = self.caches[lod_name + ext]
    else:
        cache = Cache(lod_name, ext)
        self.caches[lod_name + ext] = cache
    base_path = self.base_path()
    cache.set_path(base_path)
    return cache

load(lod_name, ext='.json', cls=None, count_attr=None)

Load data from a cache file. This method supports JSON and, if a relevant class is provided, other formats like YAML.

Parameters:

Name Type Description Default
lod_name str

The name of the list of dicts or class instances to read from cache.

required
ext str

The extension of the cache file, indicating the format (default is ".json").

'.json'
cls Optional[Type[YamlAble]]

The class type for deserialization. This class must have from_json() or from_yaml() class methods for deserialization, depending on the file extension.

None
count_attr(str)

the name of attribute data_to_store for updating the cache.count s

required

Returns: Union[List, Dict, None]: A list of dicts, a list of class instances, a single dict, or None if the cache is not stored.

Source code in lodentity/cache.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def load(
    self,
    lod_name: str,
    ext: str = ".json",
    cls: Optional[Type[YamlAble]] = None,
    count_attr: str = None,
) -> Union[List, Dict, None]:
    """Load data from a cache file. This method supports JSON and, if a
    relevant class is provided, other formats like YAML.

    Args:
        lod_name (str): The name of the list of dicts or class instances to read from cache.
        ext (str): The extension of the cache file, indicating the format (default is ".json").
        cls (Optional[Type[YamlAble]]): The class type for deserialization. This class must have from_json() or from_yaml()
                                         class methods for deserialization, depending on the file extension.
        count_attr(str): the name of attribute data_to_store for updating the cache.count s
    Returns:
        Union[List, Dict, None]: A list of dicts, a list of class instances, a single dict, or None if the cache is not stored.
    """
    cache = self.get_cache_by_name(lod_name, ext)
    cache.count_attr = count_attr
    result = None
    if cache.is_stored:
        if ext == ".json":
            if cls and hasattr(cls, "load_from_yaml_file"):
                result = cls.load_from_json_file(
                    cache.path
                )  # Adjusted for class method
            else:
                with open(cache.path, encoding="utf-8") as json_file:
                    result = orjson.loads(json_file.read())
        elif ext == ".yaml":
            if cls and hasattr(cls, "load_from_yaml_file"):
                result = cls.load_from_yaml_file(
                    cache.path
                )  # Adjusted for class method
            else:
                raise ValueError(
                    "YAML deserialization requires a cls parameter that is a subclass of YamlAble."
                )
        else:
            raise ValueError(f"Unsupported file extension {ext} for loading.")

        # Dynamic count update based on count_attr if applicable
        if count_attr and hasattr(result, count_attr):
            cache.count = len(getattr(result, count_attr))
        elif isinstance(result, list):
            cache.count = len(result)

    return result

store(cache_name, data_to_store, ext='.json', count_attr=None)

Stores data into a cache file, handling serialization based on the specified file extension. Supports JSON and YAML formats, and custom serialization for classes that provide specific serialization methods.

Parameters:

Name Type Description Default
cache_name str

The identifier for the cache where the data will be stored.

required
data_to_store Union[List, Dict]

The data to be stored in the cache. This can be a list of dictionaries, a single dictionary, or instances of data classes if cls is provided.

required
ext str

The file extension indicating the serialization format (e.g., '.json', '.yaml'). Defaults to '.json'.

'.json'
count_attr(str)

the name of attribute data_to_store for updating the cache.count s

required

Raises:

Type Description
ValueError

If the file extension is unsupported or if required methods for serialization are not implemented in cls.

Source code in lodentity/cache.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
def store(
    self,
    cache_name: str,
    data_to_store: Union[List, Dict],
    ext: str = ".json",
    count_attr: str = None,
) -> Cache:
    """Stores data into a cache file, handling serialization based on the
    specified file extension. Supports JSON and YAML formats, and custom
    serialization for classes that provide specific serialization methods.

    Args:
        cache_name (str): The identifier for the cache where the data will be stored.
        data_to_store (Union[List, Dict]): The data to be stored in the cache. This can be a list of dictionaries,
                                           a single dictionary, or instances of data classes if `cls` is provided.
        ext (str): The file extension indicating the serialization format (e.g., '.json', '.yaml').
                   Defaults to '.json'.
        count_attr(str): the name of attribute data_to_store for updating the cache.count s

    Raises:
        ValueError: If the file extension is unsupported or if required methods for serialization are not implemented in `cls`.
    """
    cache = self.get_cache_by_name(cache_name, ext)
    cache.count_attr = count_attr
    cache.set_path(self.base_path())

    if ext == ".json":
        # Check if  cls has a method `save_to_json_file`
        # that accepts a file path and data to store
        if isinstance(data_to_store, list):
            json_str = orjson.dumps(data_to_store, option=orjson.OPT_INDENT_2)
            with cache.path.open("wb") as json_file:
                json_file.write(json_str)
        else:
            if hasattr(data_to_store, "save_to_json_file"):
                data_to_store.save_to_json_file(str(cache.path))
            else:
                raise ValueError(
                    "JSON serialization requires a 'save_to_json_file' method"
                )
    elif ext == ".yaml":
        if hasattr(data_to_store, "save_to_yaml_file"):
            # Assuming cls has a method `save_to_yaml_file` that accepts a file path and data to store
            data_to_store.save_to_yaml_file(str(cache.path))
        else:
            raise ValueError(
                "YAML serialization requires a 'save_to_yaml_file' method."
            )
    else:
        raise ValueError(f"Unsupported file extension {ext}.")

    # Update cache metadata post storing
    if count_attr and hasattr(data_to_store, count_attr):
        cache.count = len(getattr(data_to_store, count_attr))
    elif isinstance(data_to_store, list):
        cache.count = len(data_to_store)

    return cache

entity

Created on 2020-08-19.

@author: wf

EntityManager

Generic entity manager.

Source code in lodentity/entity.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
class EntityManager:
    """Generic entity manager."""

    def __init__(
        self,
        name,
        entityName,
        entityPluralName: str,
        listName: str = None,
        clazz=None,
        tableName: str = None,
        primaryKey: str = None,
        config=None,
        handleInvalidListTypes=False,
        filterInvalidListTypes=False,
        listSeparator="⇹",
        debug=False,
    ):
        """Constructor.

        Args:
            name(string): name of this eventManager
            entityName(string): entityType to be managed e.g. Country
            entityPluralName(string): plural of the the entityType e.g. Countries
            config(StorageConfig): the configuration to be used if None a default configuration will be used
            handleInvalidListTypes(bool): True if invalidListTypes should be converted or filtered
            filterInvalidListTypes(bool): True if invalidListTypes should be deleted
            listSeparator(str): the symbol to use as a list separator
            debug(boolean): override debug setting when default of config is used via config=None
        """
        self.name = name
        self.entityName = entityName
        self.entityPluralName = entityPluralName
        if listName is None:
            listName = entityPluralName
        if tableName is None:
            tableName = entityName
        self.primaryKey = primaryKey
        if config is None:
            config = StorageConfig.getDefault()
            if debug:
                config.debug = debug
        self.config = config
        super(EntityManager, self).__init__(
            listName=listName,
            clazz=clazz,
            tableName=tableName,
            handleInvalidListTypes=handleInvalidListTypes,
            filterInvalidListTypes=filterInvalidListTypes,
        )
        cacheFile = self.getCacheFile(config=config, mode=config.mode)
        self.showProgress(
            "Creating %smanager(%s) for %s using cache %s"
            % (self.entityName, config.mode, self.name, cacheFile)
        )
        if config.mode is StoreMode.SPARQL:
            if config.endpoint is None:
                raise Exception("no endpoint set for mode sparql")
            self.endpoint = config.endpoint
            self.sparql = SPARQL(
                config.endpoint, debug=config.debug, profile=config.profile
            )
        elif config.mode is StoreMode.SQL:
            self.executeMany = False  # may be True when issues are fixed
        self.listSeparator = listSeparator

    def storeMode(self):
        """Return my store mode."""
        return self.config.mode

    def showProgress(self, msg):
        """Display a progress message.

        Args:
          msg(string): the message to display
        """
        if self.config.withShowProgress:
            print(msg, flush=True)

    def getCacheFile(self, config=None, mode=StoreMode.SQL):
        """
        get the cache file for this event manager
        Args:
            config(StorageConfig): if None get the cache for my mode
            mode(StoreMode): the storeMode to use
        """
        if config is None:
            config = self.config
        cachedir = config.getCachePath()
        if config.cacheFile is not None:
            return config.cacheFile
        """ get the path to the file for my cached data """
        if mode is StoreMode.JSON or mode is StoreMode.JSONPICKLE:
            extension = f".{mode.name.lower()}"
            cachepath = f"{cachedir}/{self.name}-{self.listName}{extension}"
        elif mode is StoreMode.SPARQL:
            cachepath = f"SPAQRL {self.name}:{config.endpoint}"
        elif mode is StoreMode.SQL:
            cachepath = f"{cachedir}/{self.name}.db"
        else:
            cachepath = f"undefined cachepath for StoreMode {mode}"
        return cachepath

    def removeCacheFile(self):
        """Remove my cache file."""
        mode = self.config.mode
        if mode is StoreMode.JSON or mode is StoreMode.JSONPICKLE:
            cacheFile = self.getCacheFile(mode=mode)
            if os.path.isfile(cacheFile):
                os.remove(cacheFile)

    def getSQLDB(self, cacheFile):
        """Get the SQL database for the given cacheFile.

        Args:
            cacheFile(string): the file to get the SQL db from
        """
        config = self.config
        sqldb = self.sqldb = SQLDB(
            cacheFile, debug=config.debug, errorDebug=config.errorDebug
        )
        return sqldb

    def initSQLDB(
        self,
        sqldb,
        listOfDicts=None,
        withCreate: bool = True,
        withDrop: bool = True,
        sampleRecordCount=-1,
    ):
        """Initialize my sql DB.

        Args:
            listOfDicts(list): the list of dicts to analyze for type information
            withDrop(boolean): true if the existing Table should be dropped
            withCreate(boolean): true if the create Table command should be executed - false if only the entityInfo should be returned
            sampleRecordCount(int): the number of records to analyze for type information
        Return:
            EntityInfo: the entity information such as CREATE Table command
        """
        if listOfDicts is None:
            listOfDicts = JSONAble.getJsonTypeSamplesForClass(self.clazz)
        entityInfo = sqldb.createTable(
            listOfDicts,
            self.tableName,
            primaryKey=self.primaryKey,
            withCreate=withCreate,
            withDrop=withDrop,
            sampleRecordCount=sampleRecordCount,
        )
        return entityInfo

    def setNone(self, record, fields):
        """
        make sure the given fields in the given record are set to none
        Args:
            record(dict): the record to work on
            fields(list): the list of fields to set to None
        """
        LOD.setNone(record, fields)

    def isCached(self):
        """Check whether there is a file containing cached data for me."""
        result = False
        config = self.config
        mode = self.config.mode
        if mode is StoreMode.JSON or mode is StoreMode.JSONPICKLE:
            result = os.path.isfile(self.getCacheFile(config=self.config, mode=mode))
        elif mode is StoreMode.SPARQL:
            # @FIXME - make abstract
            query = (
                config.prefix
                + """
SELECT  ?source (COUNT(?source) AS ?sourcecount)
WHERE {
   ?event cr:Event_source ?source.
}
GROUP by ?source
"""
            )
            sourceCountList = self.sparql.queryAsListOfDicts(query)
            for sourceCount in sourceCountList:
                source = sourceCount["source"]
                recordCount = sourceCount["sourcecount"]
                if source == self.name and recordCount > 100:
                    result = True
        elif mode is StoreMode.SQL:
            cacheFile = self.getCacheFile(config=self.config, mode=StoreMode.SQL)
            if os.path.isfile(cacheFile):
                sqlQuery = f"SELECT COUNT(*) AS count FROM {self.tableName}"
                try:
                    sqlDB = self.getSQLDB(cacheFile)
                    countResults = sqlDB.query(sqlQuery)
                    countResult = countResults[0]
                    count = countResult["count"]
                    result = count >= 0
                except Exception as ex:
                    msg = str(ex)
                    if self.debug:
                        print(msg, file=sys.stderr)
                        sys.stderr.flush()
                    # e.g. sqlite3.OperationalError: no such table: Event_crossref
                    pass
        else:
            raise Exception("unsupported mode %s" % self.mode)
        return result

    def fromCache(
        self,
        force: bool = False,
        getListOfDicts=None,
        append=False,
        sampleRecordCount=-1,
    ):
        """Get my entries from the cache or from the callback provided.

        Args:
            force(bool): force ignoring the cache
            getListOfDicts(callable): a function to call for getting the data
            append(bool): True if records should be appended
            sampleRecordCount(int): the number of records to analyze for type information

        Returns:
            the list of Dicts and as a side effect setting self.cacheFile
        """
        if not self.isCached() or force:
            startTime = time.time()
            self.showProgress(f"getting {self.entityPluralName} for {self.name} ...")
            if getListOfDicts is None:
                if hasattr(self, "getListOfDicts"):
                    getListOfDicts = self.getListOfDicts
                else:
                    raise Exception(
                        "from Cache failed and no secondary cache via getListOfDicts specified"
                    )
            listOfDicts = getListOfDicts()
            duration = time.time() - startTime
            self.showProgress(
                f"got {len(listOfDicts)} {self.entityPluralName} in {duration:5.1f} s"
            )
            self.cacheFile = self.storeLoD(
                listOfDicts, append=append, sampleRecordCount=sampleRecordCount
            )
            self.setListFromLoD(listOfDicts)
        else:
            # fromStore also sets self.cacheFile
            listOfDicts = self.fromStore()
        return listOfDicts

    def fromStore(self, cacheFile=None, setList: bool = True) -> list:
        """
        restore me from the store
        Args:
            cacheFile(String): the cacheFile to use if None use the pre configured cachefile
            setList(bool): if True set my list with the data from the cache file

        Returns:
            list: list of dicts or JSON entitymanager
        """
        startTime = time.time()
        if cacheFile is None:
            cacheFile = self.getCacheFile(config=self.config, mode=self.config.mode)
        self.cacheFile = cacheFile
        self.showProgress(
            "reading %s for %s from cache %s"
            % (self.entityPluralName, self.name, cacheFile)
        )
        mode = self.config.mode
        if mode is StoreMode.JSONPICKLE:
            JSONem = JsonPickleMixin.readJsonPickle(cacheFile)
            if self.clazz is not None:
                listOfDicts = JSONem.getLoD()
            else:
                listOfDicts = JSONem.getList()
        elif mode is StoreMode.JSON:
            listOfDicts = self.readLodFromJsonFile(cacheFile)
            pass
        elif mode is StoreMode.SPARQL:
            # @FIXME make abstract
            eventQuery = (
                """
PREFIX cr: <http://cr.bitplan.com/>
SELECT ?eventId ?acronym ?series ?title ?year ?country ?city ?startDate ?endDate ?url ?source WHERE {
   OPTIONAL { ?event cr:Event_eventId ?eventId. }
   OPTIONAL { ?event cr:Event_acronym ?acronym. }
   OPTIONAL { ?event cr:Event_series ?series. }
   OPTIONAL { ?event cr:Event_title ?title. }
   OPTIONAL { ?event cr:Event_year ?year.  }
   OPTIONAL { ?event cr:Event_country ?country. }
   OPTIONAL { ?event cr:Event_city ?city. }
   OPTIONAL { ?event cr:Event_startDate ?startDate. }
   OPTIONAL { ?event cr:Event_endDate ?endDate. }
   OPTIONAL { ?event cr:Event_url ?url. }
   ?event cr:Event_source ?source FILTER(?source='%s').
}
"""
                % self.name
            )
            listOfDicts = self.sparql.queryAsListOfDicts(eventQuery)
        elif mode is StoreMode.SQL:
            sqlQuery = "SELECT * FROM %s" % self.tableName
            sqlDB = self.getSQLDB(cacheFile)
            listOfDicts = sqlDB.query(sqlQuery)
            sqlDB.close()
            pass
        else:
            raise Exception("unsupported store mode %s" % self.mode)

        self.showProgress(
            "read %d %s from %s in %5.1f s"
            % (
                len(listOfDicts),
                self.entityPluralName,
                self.name,
                time.time() - startTime,
            )
        )
        if setList:
            self.setListFromLoD(listOfDicts)
        return listOfDicts

    def getLoD(self):
        """Return the LoD of the entities in the list.

        Return:
            list: a list of Dicts
        """
        lod = []
        for entity in self.getList():
            # TODO - optionally filter by samples
            lod.append(entity.__dict__)
        return lod

    def store(
        self,
        limit=10000000,
        batchSize=250,
        append=False,
        fixNone=True,
        sampleRecordCount=-1,
        replace: bool = False,
    ) -> str:
        """Store my list of dicts.

        Args:
            limit(int): maximum number of records to store per batch
            batchSize(int): size of batch for storing
            append(bool): True if records should be appended
            fixNone(bool): if True make sure the dicts are filled with None references for each record
            sampleRecordCount(int): the number of records to analyze for type information
            replace(bool): if True allow replace for insert

        Return:
            str: The cachefile being used
        """
        lod = self.getLoD()
        return self.storeLoD(
            lod,
            limit=limit,
            batchSize=batchSize,
            append=append,
            fixNone=fixNone,
            sampleRecordCount=sampleRecordCount,
            replace=replace,
        )

    def storeLoD(
        self,
        listOfDicts,
        limit=10000000,
        batchSize=250,
        cacheFile=None,
        append=False,
        fixNone=True,
        sampleRecordCount=1,
        replace: bool = False,
    ) -> str:
        """Store my entities.

        Args:
            listOfDicts(list): the list of dicts to store
            limit(int): maximum number of records to store
            batchSize(int): size of batch for storing
            cacheFile(string): the name of the storage e.g path to JSON or sqlite3 file
            append(bool): True if records should be appended
            fixNone(bool): if True make sure the dicts are filled with None references for each record
            sampleRecordCount(int): the number of records to analyze for type information
            replace(bool): if True allow replace for insert
        Return:
            str: The cachefile being used
        """
        config = self.config
        mode = config.mode
        if self.handleInvalidListTypes:
            LOD.handleListTypes(
                lod=listOfDicts,
                doFilter=self.filterInvalidListTypes,
                separator=self.listSeparator,
            )
        if mode is StoreMode.JSON or mode is StoreMode.JSONPICKLE:
            if cacheFile is None:
                cacheFile = self.getCacheFile(config=self.config, mode=mode)
            self.showProgress(
                f"storing {len(listOfDicts)} {self.entityPluralName} for {self.name} to cache {cacheFile}"
            )
            if mode is StoreMode.JSONPICKLE:
                self.writeJsonPickle(cacheFile)
            if mode is StoreMode.JSON:
                self.storeToJsonFile(cacheFile)
                pass
        elif mode is StoreMode.SPARQL:
            startTime = time.time()
            msg = f"storing {len(listOfDicts)} {self.entityPluralName} to {self.config.mode} ({self.config.endpoint})"
            self.showProgress(msg)
            # @ FIXME make abstract /configurable
            entityType = "cr:Event"
            prefixes = self.config.prefix
            self.sparql.insertListOfDicts(
                listOfDicts,
                entityType,
                self.primaryKey,
                prefixes,
                limit=limit,
                batchSize=batchSize,
            )
            self.showProgress(
                "store for %s done after %5.1f secs"
                % (self.name, time.time() - startTime)
            )
        elif mode is StoreMode.SQL:
            startTime = time.time()
            if cacheFile is None:
                cacheFile = self.getCacheFile(config=self.config, mode=self.config.mode)
            sqldb = self.getSQLDB(cacheFile)
            self.showProgress(
                "storing %d %s for %s to %s:%s"
                % (
                    len(listOfDicts),
                    self.entityPluralName,
                    self.name,
                    config.mode,
                    cacheFile,
                )
            )
            if append:
                withDrop = False
                withCreate = False
            else:
                withDrop = True
                withCreate = True
            entityInfo = self.initSQLDB(
                sqldb,
                listOfDicts,
                withCreate=withCreate,
                withDrop=withDrop,
                sampleRecordCount=sampleRecordCount,
            )
            self.sqldb.store(
                listOfDicts,
                entityInfo,
                executeMany=self.executeMany,
                fixNone=fixNone,
                replace=replace,
            )
            self.showProgress(
                "store for %s done after %5.1f secs"
                % (self.name, time.time() - startTime)
            )
        else:
            raise Exception(f"unsupported store mode {self.mode}")
        return cacheFile

__init__(name, entityName, entityPluralName, listName=None, clazz=None, tableName=None, primaryKey=None, config=None, handleInvalidListTypes=False, filterInvalidListTypes=False, listSeparator='⇹', debug=False)

Constructor.

Parameters:

Name Type Description Default
name(string)

name of this eventManager

required
entityName(string)

entityType to be managed e.g. Country

required
entityPluralName(string)

plural of the the entityType e.g. Countries

required
config(StorageConfig)

the configuration to be used if None a default configuration will be used

required
handleInvalidListTypes(bool)

True if invalidListTypes should be converted or filtered

required
filterInvalidListTypes(bool)

True if invalidListTypes should be deleted

required
listSeparator(str)

the symbol to use as a list separator

required
debug(boolean)

override debug setting when default of config is used via config=None

required
Source code in lodentity/entity.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def __init__(
    self,
    name,
    entityName,
    entityPluralName: str,
    listName: str = None,
    clazz=None,
    tableName: str = None,
    primaryKey: str = None,
    config=None,
    handleInvalidListTypes=False,
    filterInvalidListTypes=False,
    listSeparator="⇹",
    debug=False,
):
    """Constructor.

    Args:
        name(string): name of this eventManager
        entityName(string): entityType to be managed e.g. Country
        entityPluralName(string): plural of the the entityType e.g. Countries
        config(StorageConfig): the configuration to be used if None a default configuration will be used
        handleInvalidListTypes(bool): True if invalidListTypes should be converted or filtered
        filterInvalidListTypes(bool): True if invalidListTypes should be deleted
        listSeparator(str): the symbol to use as a list separator
        debug(boolean): override debug setting when default of config is used via config=None
    """
    self.name = name
    self.entityName = entityName
    self.entityPluralName = entityPluralName
    if listName is None:
        listName = entityPluralName
    if tableName is None:
        tableName = entityName
    self.primaryKey = primaryKey
    if config is None:
        config = StorageConfig.getDefault()
        if debug:
            config.debug = debug
    self.config = config
    super(EntityManager, self).__init__(
        listName=listName,
        clazz=clazz,
        tableName=tableName,
        handleInvalidListTypes=handleInvalidListTypes,
        filterInvalidListTypes=filterInvalidListTypes,
    )
    cacheFile = self.getCacheFile(config=config, mode=config.mode)
    self.showProgress(
        "Creating %smanager(%s) for %s using cache %s"
        % (self.entityName, config.mode, self.name, cacheFile)
    )
    if config.mode is StoreMode.SPARQL:
        if config.endpoint is None:
            raise Exception("no endpoint set for mode sparql")
        self.endpoint = config.endpoint
        self.sparql = SPARQL(
            config.endpoint, debug=config.debug, profile=config.profile
        )
    elif config.mode is StoreMode.SQL:
        self.executeMany = False  # may be True when issues are fixed
    self.listSeparator = listSeparator

fromCache(force=False, getListOfDicts=None, append=False, sampleRecordCount=-1)

Get my entries from the cache or from the callback provided.

Parameters:

Name Type Description Default
force(bool)

force ignoring the cache

required
getListOfDicts(callable)

a function to call for getting the data

required
append(bool)

True if records should be appended

required
sampleRecordCount(int)

the number of records to analyze for type information

required

Returns:

Type Description

the list of Dicts and as a side effect setting self.cacheFile

Source code in lodentity/entity.py
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
def fromCache(
    self,
    force: bool = False,
    getListOfDicts=None,
    append=False,
    sampleRecordCount=-1,
):
    """Get my entries from the cache or from the callback provided.

    Args:
        force(bool): force ignoring the cache
        getListOfDicts(callable): a function to call for getting the data
        append(bool): True if records should be appended
        sampleRecordCount(int): the number of records to analyze for type information

    Returns:
        the list of Dicts and as a side effect setting self.cacheFile
    """
    if not self.isCached() or force:
        startTime = time.time()
        self.showProgress(f"getting {self.entityPluralName} for {self.name} ...")
        if getListOfDicts is None:
            if hasattr(self, "getListOfDicts"):
                getListOfDicts = self.getListOfDicts
            else:
                raise Exception(
                    "from Cache failed and no secondary cache via getListOfDicts specified"
                )
        listOfDicts = getListOfDicts()
        duration = time.time() - startTime
        self.showProgress(
            f"got {len(listOfDicts)} {self.entityPluralName} in {duration:5.1f} s"
        )
        self.cacheFile = self.storeLoD(
            listOfDicts, append=append, sampleRecordCount=sampleRecordCount
        )
        self.setListFromLoD(listOfDicts)
    else:
        # fromStore also sets self.cacheFile
        listOfDicts = self.fromStore()
    return listOfDicts

fromStore(cacheFile=None, setList=True)

restore me from the store Args: cacheFile(String): the cacheFile to use if None use the pre configured cachefile setList(bool): if True set my list with the data from the cache file

Returns:

Name Type Description
list list

list of dicts or JSON entitymanager

Source code in lodentity/entity.py
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
    def fromStore(self, cacheFile=None, setList: bool = True) -> list:
        """
        restore me from the store
        Args:
            cacheFile(String): the cacheFile to use if None use the pre configured cachefile
            setList(bool): if True set my list with the data from the cache file

        Returns:
            list: list of dicts or JSON entitymanager
        """
        startTime = time.time()
        if cacheFile is None:
            cacheFile = self.getCacheFile(config=self.config, mode=self.config.mode)
        self.cacheFile = cacheFile
        self.showProgress(
            "reading %s for %s from cache %s"
            % (self.entityPluralName, self.name, cacheFile)
        )
        mode = self.config.mode
        if mode is StoreMode.JSONPICKLE:
            JSONem = JsonPickleMixin.readJsonPickle(cacheFile)
            if self.clazz is not None:
                listOfDicts = JSONem.getLoD()
            else:
                listOfDicts = JSONem.getList()
        elif mode is StoreMode.JSON:
            listOfDicts = self.readLodFromJsonFile(cacheFile)
            pass
        elif mode is StoreMode.SPARQL:
            # @FIXME make abstract
            eventQuery = (
                """
PREFIX cr: <http://cr.bitplan.com/>
SELECT ?eventId ?acronym ?series ?title ?year ?country ?city ?startDate ?endDate ?url ?source WHERE {
   OPTIONAL { ?event cr:Event_eventId ?eventId. }
   OPTIONAL { ?event cr:Event_acronym ?acronym. }
   OPTIONAL { ?event cr:Event_series ?series. }
   OPTIONAL { ?event cr:Event_title ?title. }
   OPTIONAL { ?event cr:Event_year ?year.  }
   OPTIONAL { ?event cr:Event_country ?country. }
   OPTIONAL { ?event cr:Event_city ?city. }
   OPTIONAL { ?event cr:Event_startDate ?startDate. }
   OPTIONAL { ?event cr:Event_endDate ?endDate. }
   OPTIONAL { ?event cr:Event_url ?url. }
   ?event cr:Event_source ?source FILTER(?source='%s').
}
"""
                % self.name
            )
            listOfDicts = self.sparql.queryAsListOfDicts(eventQuery)
        elif mode is StoreMode.SQL:
            sqlQuery = "SELECT * FROM %s" % self.tableName
            sqlDB = self.getSQLDB(cacheFile)
            listOfDicts = sqlDB.query(sqlQuery)
            sqlDB.close()
            pass
        else:
            raise Exception("unsupported store mode %s" % self.mode)

        self.showProgress(
            "read %d %s from %s in %5.1f s"
            % (
                len(listOfDicts),
                self.entityPluralName,
                self.name,
                time.time() - startTime,
            )
        )
        if setList:
            self.setListFromLoD(listOfDicts)
        return listOfDicts

getCacheFile(config=None, mode=StoreMode.SQL)

get the cache file for this event manager Args: config(StorageConfig): if None get the cache for my mode mode(StoreMode): the storeMode to use

Source code in lodentity/entity.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def getCacheFile(self, config=None, mode=StoreMode.SQL):
    """
    get the cache file for this event manager
    Args:
        config(StorageConfig): if None get the cache for my mode
        mode(StoreMode): the storeMode to use
    """
    if config is None:
        config = self.config
    cachedir = config.getCachePath()
    if config.cacheFile is not None:
        return config.cacheFile
    """ get the path to the file for my cached data """
    if mode is StoreMode.JSON or mode is StoreMode.JSONPICKLE:
        extension = f".{mode.name.lower()}"
        cachepath = f"{cachedir}/{self.name}-{self.listName}{extension}"
    elif mode is StoreMode.SPARQL:
        cachepath = f"SPAQRL {self.name}:{config.endpoint}"
    elif mode is StoreMode.SQL:
        cachepath = f"{cachedir}/{self.name}.db"
    else:
        cachepath = f"undefined cachepath for StoreMode {mode}"
    return cachepath

getLoD()

Return the LoD of the entities in the list.

Return

list: a list of Dicts

Source code in lodentity/entity.py
341
342
343
344
345
346
347
348
349
350
351
def getLoD(self):
    """Return the LoD of the entities in the list.

    Return:
        list: a list of Dicts
    """
    lod = []
    for entity in self.getList():
        # TODO - optionally filter by samples
        lod.append(entity.__dict__)
    return lod

getSQLDB(cacheFile)

Get the SQL database for the given cacheFile.

Parameters:

Name Type Description Default
cacheFile(string)

the file to get the SQL db from

required
Source code in lodentity/entity.py
130
131
132
133
134
135
136
137
138
139
140
def getSQLDB(self, cacheFile):
    """Get the SQL database for the given cacheFile.

    Args:
        cacheFile(string): the file to get the SQL db from
    """
    config = self.config
    sqldb = self.sqldb = SQLDB(
        cacheFile, debug=config.debug, errorDebug=config.errorDebug
    )
    return sqldb

initSQLDB(sqldb, listOfDicts=None, withCreate=True, withDrop=True, sampleRecordCount=-1)

Initialize my sql DB.

Parameters:

Name Type Description Default
listOfDicts(list)

the list of dicts to analyze for type information

required
withDrop(boolean)

true if the existing Table should be dropped

required
withCreate(boolean)

true if the create Table command should be executed - false if only the entityInfo should be returned

required
sampleRecordCount(int)

the number of records to analyze for type information

required

Return: EntityInfo: the entity information such as CREATE Table command

Source code in lodentity/entity.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def initSQLDB(
    self,
    sqldb,
    listOfDicts=None,
    withCreate: bool = True,
    withDrop: bool = True,
    sampleRecordCount=-1,
):
    """Initialize my sql DB.

    Args:
        listOfDicts(list): the list of dicts to analyze for type information
        withDrop(boolean): true if the existing Table should be dropped
        withCreate(boolean): true if the create Table command should be executed - false if only the entityInfo should be returned
        sampleRecordCount(int): the number of records to analyze for type information
    Return:
        EntityInfo: the entity information such as CREATE Table command
    """
    if listOfDicts is None:
        listOfDicts = JSONAble.getJsonTypeSamplesForClass(self.clazz)
    entityInfo = sqldb.createTable(
        listOfDicts,
        self.tableName,
        primaryKey=self.primaryKey,
        withCreate=withCreate,
        withDrop=withDrop,
        sampleRecordCount=sampleRecordCount,
    )
    return entityInfo

isCached()

Check whether there is a file containing cached data for me.

Source code in lodentity/entity.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
    def isCached(self):
        """Check whether there is a file containing cached data for me."""
        result = False
        config = self.config
        mode = self.config.mode
        if mode is StoreMode.JSON or mode is StoreMode.JSONPICKLE:
            result = os.path.isfile(self.getCacheFile(config=self.config, mode=mode))
        elif mode is StoreMode.SPARQL:
            # @FIXME - make abstract
            query = (
                config.prefix
                + """
SELECT  ?source (COUNT(?source) AS ?sourcecount)
WHERE {
   ?event cr:Event_source ?source.
}
GROUP by ?source
"""
            )
            sourceCountList = self.sparql.queryAsListOfDicts(query)
            for sourceCount in sourceCountList:
                source = sourceCount["source"]
                recordCount = sourceCount["sourcecount"]
                if source == self.name and recordCount > 100:
                    result = True
        elif mode is StoreMode.SQL:
            cacheFile = self.getCacheFile(config=self.config, mode=StoreMode.SQL)
            if os.path.isfile(cacheFile):
                sqlQuery = f"SELECT COUNT(*) AS count FROM {self.tableName}"
                try:
                    sqlDB = self.getSQLDB(cacheFile)
                    countResults = sqlDB.query(sqlQuery)
                    countResult = countResults[0]
                    count = countResult["count"]
                    result = count >= 0
                except Exception as ex:
                    msg = str(ex)
                    if self.debug:
                        print(msg, file=sys.stderr)
                        sys.stderr.flush()
                    # e.g. sqlite3.OperationalError: no such table: Event_crossref
                    pass
        else:
            raise Exception("unsupported mode %s" % self.mode)
        return result

removeCacheFile()

Remove my cache file.

Source code in lodentity/entity.py
122
123
124
125
126
127
128
def removeCacheFile(self):
    """Remove my cache file."""
    mode = self.config.mode
    if mode is StoreMode.JSON or mode is StoreMode.JSONPICKLE:
        cacheFile = self.getCacheFile(mode=mode)
        if os.path.isfile(cacheFile):
            os.remove(cacheFile)

setNone(record, fields)

make sure the given fields in the given record are set to none Args: record(dict): the record to work on fields(list): the list of fields to set to None

Source code in lodentity/entity.py
172
173
174
175
176
177
178
179
def setNone(self, record, fields):
    """
    make sure the given fields in the given record are set to none
    Args:
        record(dict): the record to work on
        fields(list): the list of fields to set to None
    """
    LOD.setNone(record, fields)

showProgress(msg)

Display a progress message.

Parameters:

Name Type Description Default
msg(string)

the message to display

required
Source code in lodentity/entity.py
89
90
91
92
93
94
95
96
def showProgress(self, msg):
    """Display a progress message.

    Args:
      msg(string): the message to display
    """
    if self.config.withShowProgress:
        print(msg, flush=True)

store(limit=10000000, batchSize=250, append=False, fixNone=True, sampleRecordCount=-1, replace=False)

Store my list of dicts.

Parameters:

Name Type Description Default
limit(int)

maximum number of records to store per batch

required
batchSize(int)

size of batch for storing

required
append(bool)

True if records should be appended

required
fixNone(bool)

if True make sure the dicts are filled with None references for each record

required
sampleRecordCount(int)

the number of records to analyze for type information

required
replace(bool)

if True allow replace for insert

required
Return

str: The cachefile being used

Source code in lodentity/entity.py
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
def store(
    self,
    limit=10000000,
    batchSize=250,
    append=False,
    fixNone=True,
    sampleRecordCount=-1,
    replace: bool = False,
) -> str:
    """Store my list of dicts.

    Args:
        limit(int): maximum number of records to store per batch
        batchSize(int): size of batch for storing
        append(bool): True if records should be appended
        fixNone(bool): if True make sure the dicts are filled with None references for each record
        sampleRecordCount(int): the number of records to analyze for type information
        replace(bool): if True allow replace for insert

    Return:
        str: The cachefile being used
    """
    lod = self.getLoD()
    return self.storeLoD(
        lod,
        limit=limit,
        batchSize=batchSize,
        append=append,
        fixNone=fixNone,
        sampleRecordCount=sampleRecordCount,
        replace=replace,
    )

storeLoD(listOfDicts, limit=10000000, batchSize=250, cacheFile=None, append=False, fixNone=True, sampleRecordCount=1, replace=False)

Store my entities.

Parameters:

Name Type Description Default
listOfDicts(list)

the list of dicts to store

required
limit(int)

maximum number of records to store

required
batchSize(int)

size of batch for storing

required
cacheFile(string)

the name of the storage e.g path to JSON or sqlite3 file

required
append(bool)

True if records should be appended

required
fixNone(bool)

if True make sure the dicts are filled with None references for each record

required
sampleRecordCount(int)

the number of records to analyze for type information

required
replace(bool)

if True allow replace for insert

required

Return: str: The cachefile being used

Source code in lodentity/entity.py
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
def storeLoD(
    self,
    listOfDicts,
    limit=10000000,
    batchSize=250,
    cacheFile=None,
    append=False,
    fixNone=True,
    sampleRecordCount=1,
    replace: bool = False,
) -> str:
    """Store my entities.

    Args:
        listOfDicts(list): the list of dicts to store
        limit(int): maximum number of records to store
        batchSize(int): size of batch for storing
        cacheFile(string): the name of the storage e.g path to JSON or sqlite3 file
        append(bool): True if records should be appended
        fixNone(bool): if True make sure the dicts are filled with None references for each record
        sampleRecordCount(int): the number of records to analyze for type information
        replace(bool): if True allow replace for insert
    Return:
        str: The cachefile being used
    """
    config = self.config
    mode = config.mode
    if self.handleInvalidListTypes:
        LOD.handleListTypes(
            lod=listOfDicts,
            doFilter=self.filterInvalidListTypes,
            separator=self.listSeparator,
        )
    if mode is StoreMode.JSON or mode is StoreMode.JSONPICKLE:
        if cacheFile is None:
            cacheFile = self.getCacheFile(config=self.config, mode=mode)
        self.showProgress(
            f"storing {len(listOfDicts)} {self.entityPluralName} for {self.name} to cache {cacheFile}"
        )
        if mode is StoreMode.JSONPICKLE:
            self.writeJsonPickle(cacheFile)
        if mode is StoreMode.JSON:
            self.storeToJsonFile(cacheFile)
            pass
    elif mode is StoreMode.SPARQL:
        startTime = time.time()
        msg = f"storing {len(listOfDicts)} {self.entityPluralName} to {self.config.mode} ({self.config.endpoint})"
        self.showProgress(msg)
        # @ FIXME make abstract /configurable
        entityType = "cr:Event"
        prefixes = self.config.prefix
        self.sparql.insertListOfDicts(
            listOfDicts,
            entityType,
            self.primaryKey,
            prefixes,
            limit=limit,
            batchSize=batchSize,
        )
        self.showProgress(
            "store for %s done after %5.1f secs"
            % (self.name, time.time() - startTime)
        )
    elif mode is StoreMode.SQL:
        startTime = time.time()
        if cacheFile is None:
            cacheFile = self.getCacheFile(config=self.config, mode=self.config.mode)
        sqldb = self.getSQLDB(cacheFile)
        self.showProgress(
            "storing %d %s for %s to %s:%s"
            % (
                len(listOfDicts),
                self.entityPluralName,
                self.name,
                config.mode,
                cacheFile,
            )
        )
        if append:
            withDrop = False
            withCreate = False
        else:
            withDrop = True
            withCreate = True
        entityInfo = self.initSQLDB(
            sqldb,
            listOfDicts,
            withCreate=withCreate,
            withDrop=withDrop,
            sampleRecordCount=sampleRecordCount,
        )
        self.sqldb.store(
            listOfDicts,
            entityInfo,
            executeMany=self.executeMany,
            fixNone=fixNone,
            replace=replace,
        )
        self.showProgress(
            "store for %s done after %5.1f secs"
            % (self.name, time.time() - startTime)
        )
    else:
        raise Exception(f"unsupported store mode {self.mode}")
    return cacheFile

storeMode()

Return my store mode.

Source code in lodentity/entity.py
85
86
87
def storeMode(self):
    """Return my store mode."""
    return self.config.mode

jsonable

This module has a class JSONAble for serialization of tables/list of dicts to and from JSON encoding.

Created on 2020-09-03

@author: wf

JSONAble

Bases: object

Mixin to allow classes to be JSON serializable see.

  • https://stackoverflow.com/questions/3768895/how-to-make-a-class-json-serializable
Source code in lodentity/jsonable.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
class JSONAble(object):
    """Mixin to allow classes to be JSON serializable see.

    - https://stackoverflow.com/questions/3768895/how-to-make-a-class-json-serializable
    """

    def __init__(self):
        """Constructor."""

    @classmethod
    def getPluralname(cls):
        return "%ss" % cls.__name__

    @staticmethod
    def singleQuoteToDoubleQuote(singleQuoted, useRegex=False):
        """Convert a single quoted string to a double quoted one.

        Args:
            singleQuoted (str): a single quoted string e.g.

                .. highlight:: json

                {'cities': [{'name': "Upper Hell's Gate"}]}

            useRegex (boolean): True if a regular expression shall be used for matching

        Returns:
            string: the double quoted version of the string

        Note:
            see
            - https://stackoverflow.com/questions/55600788/python-replace-single-quotes-with-double-quotes-but-leave-ones-within-double-q
        """
        if useRegex:
            doubleQuoted = JSONAble.singleQuoteToDoubleQuoteUsingRegex(singleQuoted)
        else:
            doubleQuoted = JSONAble.singleQuoteToDoubleQuoteUsingBracketLoop(
                singleQuoted
            )
        return doubleQuoted

    @staticmethod
    def singleQuoteToDoubleQuoteUsingRegex(singleQuoted):
        """Convert a single quoted string to a double quoted one using a
        regular expression.

        Args:
            singleQuoted(string): a single quoted string e.g. {'cities': [{'name': "Upper Hell's Gate"}]}
            useRegex(boolean): True if a regular expression shall be used for matching
        Returns:
            string: the double quoted version of the string e.g.
        Note:
            see https://stackoverflow.com/a/50257217/1497139
        """
        doubleQuoted = JSONAbleSettings.singleQuoteRegex.sub('"', singleQuoted)
        return doubleQuoted

    @staticmethod
    def singleQuoteToDoubleQuoteUsingBracketLoop(singleQuoted):
        """Convert a single quoted string to a double quoted one using a
        regular expression.

        Args:
            singleQuoted(string): a single quoted string e.g. {'cities': [{'name': "Upper Hell's Gate"}]}
            useRegex(boolean): True if a regular expression shall be used for matching
        Returns:
            string: the double quoted version of the string e.g.
        Note:
            see https://stackoverflow.com/a/63862387/1497139
        """
        cList = list(singleQuoted)
        inDouble = False
        inSingle = False
        for i, c in enumerate(cList):
            # print ("%d:%s %r %r" %(i,c,inSingle,inDouble))
            if c == "'":
                if not inDouble:
                    inSingle = not inSingle
                    cList[i] = '"'
            elif c == '"':
                inDouble = not inDouble
                inSingle = False
        doubleQuoted = "".join(cList)
        return doubleQuoted

    def getJsonTypeSamples(self):
        """Does my class provide a "getSamples" method?"""
        if hasattr(self, "__class__"):
            cls = self.__class__
            if isinstance(self, JSONAbleList) and not hasattr(cls, "getSamples"):
                cls = self.clazz
            return JSONAble.getJsonTypeSamplesForClass(cls)
        return None

    @staticmethod
    def getJsonTypeSamplesForClass(cls):
        """Return the type samples for the given class.

        Return:
            list: a list of dict that specify the types by example
        """
        if hasattr(cls, "getSamples"):
            getSamples = getattr(cls, "getSamples")
            if callable(getSamples):
                return getSamples()
        return None

    @staticmethod
    def readJsonFromFile(jsonFilePath):
        """Read json string from the given jsonFilePath.

        Args:
            jsonFilePath(string): the path of the file where to read the result from

        Returns:
            the JSON string read from the file
        """
        with open(jsonFilePath, "r") as jsonFile:
            jsonStr = jsonFile.read()
        return jsonStr

    @staticmethod
    def storeJsonToFile(jsonStr, jsonFilePath):
        """Store the given json string to the given jsonFilePath.

        Args:
            jsonStr(string): the string to store
            jsonFilePath(string): the path of the file where to store the result
        """
        with open(jsonFilePath, "w") as jsonFile:
            jsonFile.write(jsonStr)

    def checkExtension(self, jsonFile: str, extension: str = ".json") -> str:
        """Make sure the jsonFile has the given extension e.g. ".json".

        Args:
            jsonFile(str): the jsonFile name - potentially without ".json" suffix

        Returns:
            str: the jsonFile name with ".json" as an extension guaranteed
        """
        if not jsonFile.endswith(extension):
            jsonFile = f"{jsonFile}{extension}"
        return jsonFile

    def storeToJsonFile(
        self, jsonFile: str, extension: str = ".json", limitToSampleFields: bool = False
    ):
        """Store me to the given jsonFile.

        Args:
            jsonFile(str): the JSON file name (optionally without extension)
            exension(str): the extension to use if not part of the jsonFile name
            limitToSampleFields(bool): If True the returned JSON is limited to the attributes/fields that are present in the samples. Otherwise all attributes of the object will be included. Default is False.
        """
        jsonFile = self.checkExtension(jsonFile, extension)
        JSONAble.storeJsonToFile(self.toJSON(limitToSampleFields), jsonFile)

    def restoreFromJsonFile(self, jsonFile: str):
        """Restore me from the given jsonFile.

        Args:
            jsonFile(string): the jsonFile to restore me from
        """
        jsonFile = self.checkExtension(jsonFile)
        jsonStr = JSONAble.readJsonFromFile(jsonFile)
        self.fromJson(jsonStr)

    def fromJson(self, jsonStr):
        """Initialize me from the given JSON string.

        Args:
            jsonStr(str): the JSON string
        """
        jsonMap = json.loads(jsonStr)
        self.fromDict(jsonMap)

    def fromDict(self, data: dict):
        """Initialize me from the given data.

        Args:
            data(dict): the dictionary to initialize me from
        """
        # https://stackoverflow.com/questions/38987/how-do-i-merge-two-dictionaries-in-a-single-expression-in-python-taking-union-o
        for key in data.keys():
            value = data[key]
            setattr(self, key, value)

    def toJsonAbleValue(self, v):
        """
        return the JSON able value of the given value v
        Args:
            v(object): the value to convert
        """
        # objects have __dict__ hash tables which can be JSON-converted
        if hasattr(v, "__dict__"):
            return v.__dict__
        elif isinstance(v, datetime.datetime):
            return v.isoformat()
        elif isinstance(v, datetime.date):
            return v.isoformat()
        else:
            return ""

    def toJSON(self, limitToSampleFields: bool = False):
        """

        Args:
            limitToSampleFields(bool): If True the returned JSON is limited to the attributes/fields that are present in the samples. Otherwise all attributes of the object will be included. Default is False.

        Returns:
            a recursive JSON dump of the dicts of my objects
        """
        data = {}
        if limitToSampleFields:
            samples = self.getJsonTypeSamples()
            sampleFields = LOD.getFields(samples)
            if isinstance(self, JSONAbleList):
                limitedRecords = []
                for record in self.__dict__[self.listName]:
                    limitedRecord = {}
                    for key, value in record.__dict__.items():
                        if key in sampleFields:
                            limitedRecord[key] = value
                    limitedRecords.append(limitedRecord)
                data[self.listName] = limitedRecords
            else:
                for key, value in self.__dict__.items():
                    if key in sampleFields:
                        data[key] = value
        else:
            data = self
        jsonStr = json.dumps(
            data,
            default=lambda v: self.toJsonAbleValue(v),
            sort_keys=True,
            indent=JSONAbleSettings.indent,
        )
        return jsonStr

    def getJSONValue(self, v):
        """Get the value of the given v as JSON.

        Args:
            v(object): the value to get

        Returns:
            the the value making sure objects are return as dicts
        """
        if hasattr(v, "asJSON"):
            return v.asJSON(asString=False)
        elif type(v) is dict:
            return self.reprDict(v)
        elif type(v) is list:
            vlist = []
            for vitem in v:
                vlist.append(self.getJSONValue(vitem))
            return vlist
        elif isinstance(v, datetime.datetime):
            return v.isoformat()
        elif isinstance(v, datetime.date):
            return v.isoformat()
        elif isinstance(v, bool):
            # convert True,False to -> true,false
            return str(v).lower()
        else:
            return v

    def reprDict(self, srcDict):
        """Get the given srcDict as new dict with fields being converted with
        getJSONValue.

        Args:
            scrcDict(dict): the source dictionary

        Returns
            dict: the converted dictionary
        """
        d = dict()
        for a, v in srcDict.items():
            d[a] = self.getJSONValue(v)
        return d

    def asJSON(self, asString=True, data=None):
        """Recursively return my dict elements.

        Args:
            asString(boolean): if True return my result as a string
        """
        if data is None:
            data = self.__dict__
        jsonDict = self.reprDict(data)
        if asString:
            jsonStr = str(jsonDict)
            jsonStr = JSONAble.singleQuoteToDoubleQuote(jsonStr)
            return jsonStr
        return jsonDict

__init__()

Constructor.

Source code in lodentity/jsonable.py
37
38
def __init__(self):
    """Constructor."""

asJSON(asString=True, data=None)

Recursively return my dict elements.

Parameters:

Name Type Description Default
asString(boolean)

if True return my result as a string

required
Source code in lodentity/jsonable.py
314
315
316
317
318
319
320
321
322
323
324
325
326
327
def asJSON(self, asString=True, data=None):
    """Recursively return my dict elements.

    Args:
        asString(boolean): if True return my result as a string
    """
    if data is None:
        data = self.__dict__
    jsonDict = self.reprDict(data)
    if asString:
        jsonStr = str(jsonDict)
        jsonStr = JSONAble.singleQuoteToDoubleQuote(jsonStr)
        return jsonStr
    return jsonDict

checkExtension(jsonFile, extension='.json')

Make sure the jsonFile has the given extension e.g. ".json".

Parameters:

Name Type Description Default
jsonFile(str)

the jsonFile name - potentially without ".json" suffix

required

Returns:

Name Type Description
str str

the jsonFile name with ".json" as an extension guaranteed

Source code in lodentity/jsonable.py
163
164
165
166
167
168
169
170
171
172
173
174
def checkExtension(self, jsonFile: str, extension: str = ".json") -> str:
    """Make sure the jsonFile has the given extension e.g. ".json".

    Args:
        jsonFile(str): the jsonFile name - potentially without ".json" suffix

    Returns:
        str: the jsonFile name with ".json" as an extension guaranteed
    """
    if not jsonFile.endswith(extension):
        jsonFile = f"{jsonFile}{extension}"
    return jsonFile

fromDict(data)

Initialize me from the given data.

Parameters:

Name Type Description Default
data(dict)

the dictionary to initialize me from

required
Source code in lodentity/jsonable.py
208
209
210
211
212
213
214
215
216
217
def fromDict(self, data: dict):
    """Initialize me from the given data.

    Args:
        data(dict): the dictionary to initialize me from
    """
    # https://stackoverflow.com/questions/38987/how-do-i-merge-two-dictionaries-in-a-single-expression-in-python-taking-union-o
    for key in data.keys():
        value = data[key]
        setattr(self, key, value)

fromJson(jsonStr)

Initialize me from the given JSON string.

Parameters:

Name Type Description Default
jsonStr(str)

the JSON string

required
Source code in lodentity/jsonable.py
199
200
201
202
203
204
205
206
def fromJson(self, jsonStr):
    """Initialize me from the given JSON string.

    Args:
        jsonStr(str): the JSON string
    """
    jsonMap = json.loads(jsonStr)
    self.fromDict(jsonMap)

getJSONValue(v)

Get the value of the given v as JSON.

Parameters:

Name Type Description Default
v(object)

the value to get

required

Returns:

Type Description

the the value making sure objects are return as dicts

Source code in lodentity/jsonable.py
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
def getJSONValue(self, v):
    """Get the value of the given v as JSON.

    Args:
        v(object): the value to get

    Returns:
        the the value making sure objects are return as dicts
    """
    if hasattr(v, "asJSON"):
        return v.asJSON(asString=False)
    elif type(v) is dict:
        return self.reprDict(v)
    elif type(v) is list:
        vlist = []
        for vitem in v:
            vlist.append(self.getJSONValue(vitem))
        return vlist
    elif isinstance(v, datetime.datetime):
        return v.isoformat()
    elif isinstance(v, datetime.date):
        return v.isoformat()
    elif isinstance(v, bool):
        # convert True,False to -> true,false
        return str(v).lower()
    else:
        return v

getJsonTypeSamples()

Does my class provide a "getSamples" method?

Source code in lodentity/jsonable.py
116
117
118
119
120
121
122
123
def getJsonTypeSamples(self):
    """Does my class provide a "getSamples" method?"""
    if hasattr(self, "__class__"):
        cls = self.__class__
        if isinstance(self, JSONAbleList) and not hasattr(cls, "getSamples"):
            cls = self.clazz
        return JSONAble.getJsonTypeSamplesForClass(cls)
    return None

getJsonTypeSamplesForClass() staticmethod

Return the type samples for the given class.

Return

list: a list of dict that specify the types by example

Source code in lodentity/jsonable.py
125
126
127
128
129
130
131
132
133
134
135
136
@staticmethod
def getJsonTypeSamplesForClass(cls):
    """Return the type samples for the given class.

    Return:
        list: a list of dict that specify the types by example
    """
    if hasattr(cls, "getSamples"):
        getSamples = getattr(cls, "getSamples")
        if callable(getSamples):
            return getSamples()
    return None

readJsonFromFile(jsonFilePath) staticmethod

Read json string from the given jsonFilePath.

Parameters:

Name Type Description Default
jsonFilePath(string)

the path of the file where to read the result from

required

Returns:

Type Description

the JSON string read from the file

Source code in lodentity/jsonable.py
138
139
140
141
142
143
144
145
146
147
148
149
150
@staticmethod
def readJsonFromFile(jsonFilePath):
    """Read json string from the given jsonFilePath.

    Args:
        jsonFilePath(string): the path of the file where to read the result from

    Returns:
        the JSON string read from the file
    """
    with open(jsonFilePath, "r") as jsonFile:
        jsonStr = jsonFile.read()
    return jsonStr

reprDict(srcDict)

Get the given srcDict as new dict with fields being converted with getJSONValue.

Parameters:

Name Type Description Default
scrcDict(dict)

the source dictionary

required

Returns dict: the converted dictionary

Source code in lodentity/jsonable.py
299
300
301
302
303
304
305
306
307
308
309
310
311
312
def reprDict(self, srcDict):
    """Get the given srcDict as new dict with fields being converted with
    getJSONValue.

    Args:
        scrcDict(dict): the source dictionary

    Returns
        dict: the converted dictionary
    """
    d = dict()
    for a, v in srcDict.items():
        d[a] = self.getJSONValue(v)
    return d

restoreFromJsonFile(jsonFile)

Restore me from the given jsonFile.

Parameters:

Name Type Description Default
jsonFile(string)

the jsonFile to restore me from

required
Source code in lodentity/jsonable.py
189
190
191
192
193
194
195
196
197
def restoreFromJsonFile(self, jsonFile: str):
    """Restore me from the given jsonFile.

    Args:
        jsonFile(string): the jsonFile to restore me from
    """
    jsonFile = self.checkExtension(jsonFile)
    jsonStr = JSONAble.readJsonFromFile(jsonFile)
    self.fromJson(jsonStr)

singleQuoteToDoubleQuote(singleQuoted, useRegex=False) staticmethod

Convert a single quoted string to a double quoted one.

Parameters:

Name Type Description Default
singleQuoted str

a single quoted string e.g.

.. highlight:: json

{'cities': [{'name': "Upper Hell's Gate"}]}

required
useRegex boolean

True if a regular expression shall be used for matching

False

Returns:

Name Type Description
string

the double quoted version of the string

Note

see - https://stackoverflow.com/questions/55600788/python-replace-single-quotes-with-double-quotes-but-leave-ones-within-double-q

Source code in lodentity/jsonable.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
@staticmethod
def singleQuoteToDoubleQuote(singleQuoted, useRegex=False):
    """Convert a single quoted string to a double quoted one.

    Args:
        singleQuoted (str): a single quoted string e.g.

            .. highlight:: json

            {'cities': [{'name': "Upper Hell's Gate"}]}

        useRegex (boolean): True if a regular expression shall be used for matching

    Returns:
        string: the double quoted version of the string

    Note:
        see
        - https://stackoverflow.com/questions/55600788/python-replace-single-quotes-with-double-quotes-but-leave-ones-within-double-q
    """
    if useRegex:
        doubleQuoted = JSONAble.singleQuoteToDoubleQuoteUsingRegex(singleQuoted)
    else:
        doubleQuoted = JSONAble.singleQuoteToDoubleQuoteUsingBracketLoop(
            singleQuoted
        )
    return doubleQuoted

singleQuoteToDoubleQuoteUsingBracketLoop(singleQuoted) staticmethod

Convert a single quoted string to a double quoted one using a regular expression.

Parameters:

Name Type Description Default
singleQuoted(string)

a single quoted string e.g. {'cities': [{'name': "Upper Hell's Gate"}]}

required
useRegex(boolean)

True if a regular expression shall be used for matching

required

Returns: string: the double quoted version of the string e.g. Note: see https://stackoverflow.com/a/63862387/1497139

Source code in lodentity/jsonable.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
@staticmethod
def singleQuoteToDoubleQuoteUsingBracketLoop(singleQuoted):
    """Convert a single quoted string to a double quoted one using a
    regular expression.

    Args:
        singleQuoted(string): a single quoted string e.g. {'cities': [{'name': "Upper Hell's Gate"}]}
        useRegex(boolean): True if a regular expression shall be used for matching
    Returns:
        string: the double quoted version of the string e.g.
    Note:
        see https://stackoverflow.com/a/63862387/1497139
    """
    cList = list(singleQuoted)
    inDouble = False
    inSingle = False
    for i, c in enumerate(cList):
        # print ("%d:%s %r %r" %(i,c,inSingle,inDouble))
        if c == "'":
            if not inDouble:
                inSingle = not inSingle
                cList[i] = '"'
        elif c == '"':
            inDouble = not inDouble
            inSingle = False
    doubleQuoted = "".join(cList)
    return doubleQuoted

singleQuoteToDoubleQuoteUsingRegex(singleQuoted) staticmethod

Convert a single quoted string to a double quoted one using a regular expression.

Parameters:

Name Type Description Default
singleQuoted(string)

a single quoted string e.g. {'cities': [{'name': "Upper Hell's Gate"}]}

required
useRegex(boolean)

True if a regular expression shall be used for matching

required

Returns: string: the double quoted version of the string e.g. Note: see https://stackoverflow.com/a/50257217/1497139

Source code in lodentity/jsonable.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
@staticmethod
def singleQuoteToDoubleQuoteUsingRegex(singleQuoted):
    """Convert a single quoted string to a double quoted one using a
    regular expression.

    Args:
        singleQuoted(string): a single quoted string e.g. {'cities': [{'name': "Upper Hell's Gate"}]}
        useRegex(boolean): True if a regular expression shall be used for matching
    Returns:
        string: the double quoted version of the string e.g.
    Note:
        see https://stackoverflow.com/a/50257217/1497139
    """
    doubleQuoted = JSONAbleSettings.singleQuoteRegex.sub('"', singleQuoted)
    return doubleQuoted

storeJsonToFile(jsonStr, jsonFilePath) staticmethod

Store the given json string to the given jsonFilePath.

Parameters:

Name Type Description Default
jsonStr(string)

the string to store

required
jsonFilePath(string)

the path of the file where to store the result

required
Source code in lodentity/jsonable.py
152
153
154
155
156
157
158
159
160
161
@staticmethod
def storeJsonToFile(jsonStr, jsonFilePath):
    """Store the given json string to the given jsonFilePath.

    Args:
        jsonStr(string): the string to store
        jsonFilePath(string): the path of the file where to store the result
    """
    with open(jsonFilePath, "w") as jsonFile:
        jsonFile.write(jsonStr)

storeToJsonFile(jsonFile, extension='.json', limitToSampleFields=False)

Store me to the given jsonFile.

Parameters:

Name Type Description Default
jsonFile(str)

the JSON file name (optionally without extension)

required
exension(str)

the extension to use if not part of the jsonFile name

required
limitToSampleFields(bool)

If True the returned JSON is limited to the attributes/fields that are present in the samples. Otherwise all attributes of the object will be included. Default is False.

required
Source code in lodentity/jsonable.py
176
177
178
179
180
181
182
183
184
185
186
187
def storeToJsonFile(
    self, jsonFile: str, extension: str = ".json", limitToSampleFields: bool = False
):
    """Store me to the given jsonFile.

    Args:
        jsonFile(str): the JSON file name (optionally without extension)
        exension(str): the extension to use if not part of the jsonFile name
        limitToSampleFields(bool): If True the returned JSON is limited to the attributes/fields that are present in the samples. Otherwise all attributes of the object will be included. Default is False.
    """
    jsonFile = self.checkExtension(jsonFile, extension)
    JSONAble.storeJsonToFile(self.toJSON(limitToSampleFields), jsonFile)

toJSON(limitToSampleFields=False)

Parameters:

Name Type Description Default
limitToSampleFields(bool)

If True the returned JSON is limited to the attributes/fields that are present in the samples. Otherwise all attributes of the object will be included. Default is False.

required

Returns:

Type Description

a recursive JSON dump of the dicts of my objects

Source code in lodentity/jsonable.py
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
def toJSON(self, limitToSampleFields: bool = False):
    """

    Args:
        limitToSampleFields(bool): If True the returned JSON is limited to the attributes/fields that are present in the samples. Otherwise all attributes of the object will be included. Default is False.

    Returns:
        a recursive JSON dump of the dicts of my objects
    """
    data = {}
    if limitToSampleFields:
        samples = self.getJsonTypeSamples()
        sampleFields = LOD.getFields(samples)
        if isinstance(self, JSONAbleList):
            limitedRecords = []
            for record in self.__dict__[self.listName]:
                limitedRecord = {}
                for key, value in record.__dict__.items():
                    if key in sampleFields:
                        limitedRecord[key] = value
                limitedRecords.append(limitedRecord)
            data[self.listName] = limitedRecords
        else:
            for key, value in self.__dict__.items():
                if key in sampleFields:
                    data[key] = value
    else:
        data = self
    jsonStr = json.dumps(
        data,
        default=lambda v: self.toJsonAbleValue(v),
        sort_keys=True,
        indent=JSONAbleSettings.indent,
    )
    return jsonStr

toJsonAbleValue(v)

return the JSON able value of the given value v Args: v(object): the value to convert

Source code in lodentity/jsonable.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
def toJsonAbleValue(self, v):
    """
    return the JSON able value of the given value v
    Args:
        v(object): the value to convert
    """
    # objects have __dict__ hash tables which can be JSON-converted
    if hasattr(v, "__dict__"):
        return v.__dict__
    elif isinstance(v, datetime.datetime):
        return v.isoformat()
    elif isinstance(v, datetime.date):
        return v.isoformat()
    else:
        return ""

JSONAbleList

Bases: JSONAble

Container class.

Source code in lodentity/jsonable.py
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
class JSONAbleList(JSONAble):
    """Container class."""

    def __init__(
        self,
        listName: str = None,
        clazz=None,
        tableName: str = None,
        initList: bool = True,
        handleInvalidListTypes=False,
        filterInvalidListTypes=False,
    ):
        """Constructor.

        Args:
            listName(str): the name of the list attribute to be used for storing the List
            clazz(class): a class to be used for Object relational mapping (if any)
            tableName(str): the name of the "table" to be used
            initList(bool): True if the list should be initialized
            handleInvalidListTypes(bool): True if invalidListTypes should be converted or filtered
            filterInvalidListTypes(bool): True if invalidListTypes should be deleted
        """
        self.clazz = clazz
        self.handleInvalidListTypes = handleInvalidListTypes
        self.filterInvalidListTypes = filterInvalidListTypes
        if listName is None:
            if self.clazz is not None:
                listName = self.clazz.getPluralname()
            else:
                listName = self.__class__.name.lower()
        self.listName = listName
        if tableName is None:
            self.tableName = listName
        else:
            self.tableName = tableName
        if initList:
            self.__dict__[self.listName] = []

    def getList(self):
        """Get my list."""
        return self.__dict__[self.listName]

    def setListFromLoD(self, lod: list) -> list:
        """Set my list from the given list of dicts.

        Args:
            lod(list) a raw record list of dicts

        Returns:
            list: a list of dicts if no clazz is set
                otherwise a list of objects
        """
        # non OO mode
        if self.clazz is None:
            result = lod
            self.__dict__[self.listName] = result
        else:
            # ORM mode
            # TODO - handle errors
            self.fromLoD(lod, append=False)
        return self.getList()

    def getLoDfromJson(self, jsonStr: str, types=None, listName: str = None):
        """Get a list of Dicts form the given JSON String.

        Args:
            jsonStr(str): the JSON string
            fixType(Types): the types to be fixed
        Returns:
            list: a list of dicts
        """
        # read a data structe from the given JSON string
        lodOrDict = json.loads(jsonStr)
        # it should be a list only of dict with my list
        if not isinstance(lodOrDict, dict) and listName is not None:
            lod = lodOrDict
        else:
            if self.listName in lodOrDict:
                # get the relevant list of dicts
                lod = lodOrDict[self.listName]
            else:
                msg = f"invalid JSON for getLoD from Json\nexpecting a list of dicts or a dict with '{self.listName}' as list\nfound a dict with keys: {lodOrDict.keys()} instead"
                raise Exception(msg)
        if types is not None:
            types.fixTypes(lod, self.listName)
        return lod

    def fromLoD(self, lod, append: bool = True, debug: bool = False):
        """Load my entityList from the given list of dicts.

        Args:
            lod(list): the list of dicts to load
            append(bool): if True append to my existing entries

        Return:
            list: a list of errors (if any)
        """
        errors = []
        entityList = self.getList()
        if not append:
            del entityList[:]
        if self.handleInvalidListTypes:
            LOD.handleListTypes(lod=lod, doFilter=self.filterInvalidListTypes)

        for record in lod:
            # call the constructor to get a new instance
            try:
                entity = self.clazz()
                entity.fromDict(record)
                entityList.append(entity)
            except Exception as ex:
                error = {self.listName: record, "error": ex}
                errors.append(error)
                if debug:
                    print(error)
        return errors

    def getLookup(self, attrName: str, withDuplicates: bool = False):
        """Create a lookup dictionary by the given attribute name.

        Args:
            attrName(str): the attribute to lookup
            withDuplicates(bool): whether to retain single values or lists

        Return:
            a dictionary for lookup or a tuple dictionary,list of duplicates depending on withDuplicates
        """
        return LOD.getLookup(self.getList(), attrName, withDuplicates)

    def getJsonData(self):
        """Get my Jsondata."""
        jsonData = {self.listName: self.__dict__[self.listName]}
        return jsonData

    def toJsonAbleValue(self, v):
        """Make sure we don't store our meta information clazz, tableName and
        listName but just the list we are holding."""
        if v == self:
            return self.getJsonData()
        else:
            return super().toJsonAbleValue(v)

    def fromJson(self, jsonStr, types=None):
        """Initialize me from the given JSON string.

        Args:
            jsonStr(str): the JSON string
            fixType(Types): the types to be fixed
        """
        lod = self.getLoDfromJson(jsonStr, types, listName=self.listName)
        self.setListFromLoD(lod)

    def asJSON(self, asString=True):
        jsonData = self.getJsonData()
        return super().asJSON(asString, data=jsonData)

    def restoreFromJsonFile(self, jsonFile: str) -> list:
        """Read my list of dicts and restore it."""
        lod = self.readLodFromJsonFile(jsonFile)
        return self.setListFromLoD(lod)

    def restoreFromJsonStr(self, jsonStr: str) -> list:
        """Restore me from the given jsonStr.

        Args:
            jsonStr(str): the json string to restore me from
        """
        lod = self.readLodFromJsonStr(jsonStr)
        return self.setListFromLoD(lod)

    def readLodFromJsonFile(self, jsonFile: str, extension: str = ".json"):
        """Read the list of dicts from the given jsonFile.

        Args:
            jsonFile(string): the jsonFile to read from

        Returns:
            list: a list of dicts
        """
        jsonFile = self.checkExtension(jsonFile, extension)
        jsonStr = JSONAble.readJsonFromFile(jsonFile)
        lod = self.readLodFromJsonStr(jsonStr)
        return lod

    def readLodFromJsonStr(self, jsonStr) -> list:
        """Restore me from the given jsonStr.

        Args:
            storeFilePrefix(string): the prefix for the JSON file name
        """
        if self.clazz is None:
            typeSamples = self.getJsonTypeSamples()
        else:
            typeSamples = self.clazz.getSamples()
        if typeSamples is None:
            types = None
        else:
            types = Types(
                self.listName, warnOnUnsupportedTypes=not self.handleInvalidListTypes
            )
            types.getTypes(self.listName, typeSamples, len(typeSamples))
        lod = self.getLoDfromJson(jsonStr, types, listName=self.listName)
        return lod

__init__(listName=None, clazz=None, tableName=None, initList=True, handleInvalidListTypes=False, filterInvalidListTypes=False)

Constructor.

Parameters:

Name Type Description Default
listName(str)

the name of the list attribute to be used for storing the List

required
clazz(class)

a class to be used for Object relational mapping (if any)

required
tableName(str)

the name of the "table" to be used

required
initList(bool)

True if the list should be initialized

required
handleInvalidListTypes(bool)

True if invalidListTypes should be converted or filtered

required
filterInvalidListTypes(bool)

True if invalidListTypes should be deleted

required
Source code in lodentity/jsonable.py
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
def __init__(
    self,
    listName: str = None,
    clazz=None,
    tableName: str = None,
    initList: bool = True,
    handleInvalidListTypes=False,
    filterInvalidListTypes=False,
):
    """Constructor.

    Args:
        listName(str): the name of the list attribute to be used for storing the List
        clazz(class): a class to be used for Object relational mapping (if any)
        tableName(str): the name of the "table" to be used
        initList(bool): True if the list should be initialized
        handleInvalidListTypes(bool): True if invalidListTypes should be converted or filtered
        filterInvalidListTypes(bool): True if invalidListTypes should be deleted
    """
    self.clazz = clazz
    self.handleInvalidListTypes = handleInvalidListTypes
    self.filterInvalidListTypes = filterInvalidListTypes
    if listName is None:
        if self.clazz is not None:
            listName = self.clazz.getPluralname()
        else:
            listName = self.__class__.name.lower()
    self.listName = listName
    if tableName is None:
        self.tableName = listName
    else:
        self.tableName = tableName
    if initList:
        self.__dict__[self.listName] = []

fromJson(jsonStr, types=None)

Initialize me from the given JSON string.

Parameters:

Name Type Description Default
jsonStr(str)

the JSON string

required
fixType(Types)

the types to be fixed

required
Source code in lodentity/jsonable.py
472
473
474
475
476
477
478
479
480
def fromJson(self, jsonStr, types=None):
    """Initialize me from the given JSON string.

    Args:
        jsonStr(str): the JSON string
        fixType(Types): the types to be fixed
    """
    lod = self.getLoDfromJson(jsonStr, types, listName=self.listName)
    self.setListFromLoD(lod)

fromLoD(lod, append=True, debug=False)

Load my entityList from the given list of dicts.

Parameters:

Name Type Description Default
lod(list)

the list of dicts to load

required
append(bool)

if True append to my existing entries

required
Return

list: a list of errors (if any)

Source code in lodentity/jsonable.py
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
def fromLoD(self, lod, append: bool = True, debug: bool = False):
    """Load my entityList from the given list of dicts.

    Args:
        lod(list): the list of dicts to load
        append(bool): if True append to my existing entries

    Return:
        list: a list of errors (if any)
    """
    errors = []
    entityList = self.getList()
    if not append:
        del entityList[:]
    if self.handleInvalidListTypes:
        LOD.handleListTypes(lod=lod, doFilter=self.filterInvalidListTypes)

    for record in lod:
        # call the constructor to get a new instance
        try:
            entity = self.clazz()
            entity.fromDict(record)
            entityList.append(entity)
        except Exception as ex:
            error = {self.listName: record, "error": ex}
            errors.append(error)
            if debug:
                print(error)
    return errors

getJsonData()

Get my Jsondata.

Source code in lodentity/jsonable.py
459
460
461
462
def getJsonData(self):
    """Get my Jsondata."""
    jsonData = {self.listName: self.__dict__[self.listName]}
    return jsonData

getList()

Get my list.

Source code in lodentity/jsonable.py
368
369
370
def getList(self):
    """Get my list."""
    return self.__dict__[self.listName]

getLoDfromJson(jsonStr, types=None, listName=None)

Get a list of Dicts form the given JSON String.

Parameters:

Name Type Description Default
jsonStr(str)

the JSON string

required
fixType(Types)

the types to be fixed

required

Returns: list: a list of dicts

Source code in lodentity/jsonable.py
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
def getLoDfromJson(self, jsonStr: str, types=None, listName: str = None):
    """Get a list of Dicts form the given JSON String.

    Args:
        jsonStr(str): the JSON string
        fixType(Types): the types to be fixed
    Returns:
        list: a list of dicts
    """
    # read a data structe from the given JSON string
    lodOrDict = json.loads(jsonStr)
    # it should be a list only of dict with my list
    if not isinstance(lodOrDict, dict) and listName is not None:
        lod = lodOrDict
    else:
        if self.listName in lodOrDict:
            # get the relevant list of dicts
            lod = lodOrDict[self.listName]
        else:
            msg = f"invalid JSON for getLoD from Json\nexpecting a list of dicts or a dict with '{self.listName}' as list\nfound a dict with keys: {lodOrDict.keys()} instead"
            raise Exception(msg)
    if types is not None:
        types.fixTypes(lod, self.listName)
    return lod

getLookup(attrName, withDuplicates=False)

Create a lookup dictionary by the given attribute name.

Parameters:

Name Type Description Default
attrName(str)

the attribute to lookup

required
withDuplicates(bool)

whether to retain single values or lists

required
Return

a dictionary for lookup or a tuple dictionary,list of duplicates depending on withDuplicates

Source code in lodentity/jsonable.py
447
448
449
450
451
452
453
454
455
456
457
def getLookup(self, attrName: str, withDuplicates: bool = False):
    """Create a lookup dictionary by the given attribute name.

    Args:
        attrName(str): the attribute to lookup
        withDuplicates(bool): whether to retain single values or lists

    Return:
        a dictionary for lookup or a tuple dictionary,list of duplicates depending on withDuplicates
    """
    return LOD.getLookup(self.getList(), attrName, withDuplicates)

readLodFromJsonFile(jsonFile, extension='.json')

Read the list of dicts from the given jsonFile.

Parameters:

Name Type Description Default
jsonFile(string)

the jsonFile to read from

required

Returns:

Name Type Description
list

a list of dicts

Source code in lodentity/jsonable.py
500
501
502
503
504
505
506
507
508
509
510
511
512
def readLodFromJsonFile(self, jsonFile: str, extension: str = ".json"):
    """Read the list of dicts from the given jsonFile.

    Args:
        jsonFile(string): the jsonFile to read from

    Returns:
        list: a list of dicts
    """
    jsonFile = self.checkExtension(jsonFile, extension)
    jsonStr = JSONAble.readJsonFromFile(jsonFile)
    lod = self.readLodFromJsonStr(jsonStr)
    return lod

readLodFromJsonStr(jsonStr)

Restore me from the given jsonStr.

Parameters:

Name Type Description Default
storeFilePrefix(string)

the prefix for the JSON file name

required
Source code in lodentity/jsonable.py
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
def readLodFromJsonStr(self, jsonStr) -> list:
    """Restore me from the given jsonStr.

    Args:
        storeFilePrefix(string): the prefix for the JSON file name
    """
    if self.clazz is None:
        typeSamples = self.getJsonTypeSamples()
    else:
        typeSamples = self.clazz.getSamples()
    if typeSamples is None:
        types = None
    else:
        types = Types(
            self.listName, warnOnUnsupportedTypes=not self.handleInvalidListTypes
        )
        types.getTypes(self.listName, typeSamples, len(typeSamples))
    lod = self.getLoDfromJson(jsonStr, types, listName=self.listName)
    return lod

restoreFromJsonFile(jsonFile)

Read my list of dicts and restore it.

Source code in lodentity/jsonable.py
486
487
488
489
def restoreFromJsonFile(self, jsonFile: str) -> list:
    """Read my list of dicts and restore it."""
    lod = self.readLodFromJsonFile(jsonFile)
    return self.setListFromLoD(lod)

restoreFromJsonStr(jsonStr)

Restore me from the given jsonStr.

Parameters:

Name Type Description Default
jsonStr(str)

the json string to restore me from

required
Source code in lodentity/jsonable.py
491
492
493
494
495
496
497
498
def restoreFromJsonStr(self, jsonStr: str) -> list:
    """Restore me from the given jsonStr.

    Args:
        jsonStr(str): the json string to restore me from
    """
    lod = self.readLodFromJsonStr(jsonStr)
    return self.setListFromLoD(lod)

setListFromLoD(lod)

Set my list from the given list of dicts.

Returns:

Name Type Description
list list

a list of dicts if no clazz is set otherwise a list of objects

Source code in lodentity/jsonable.py
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
def setListFromLoD(self, lod: list) -> list:
    """Set my list from the given list of dicts.

    Args:
        lod(list) a raw record list of dicts

    Returns:
        list: a list of dicts if no clazz is set
            otherwise a list of objects
    """
    # non OO mode
    if self.clazz is None:
        result = lod
        self.__dict__[self.listName] = result
    else:
        # ORM mode
        # TODO - handle errors
        self.fromLoD(lod, append=False)
    return self.getList()

toJsonAbleValue(v)

Make sure we don't store our meta information clazz, tableName and listName but just the list we are holding.

Source code in lodentity/jsonable.py
464
465
466
467
468
469
470
def toJsonAbleValue(self, v):
    """Make sure we don't store our meta information clazz, tableName and
    listName but just the list we are holding."""
    if v == self:
        return self.getJsonData()
    else:
        return super().toJsonAbleValue(v)

JSONAbleSettings

settings for JSONAble - put in a separate class so they would not be serialized

Source code in lodentity/jsonable.py
17
18
19
20
21
22
23
24
25
26
27
28
class JSONAbleSettings:
    """
    settings for JSONAble - put in a separate class so they would not be
    serialized
    """

    indent = 4
    """
    regular expression to be used for conversion from singleQuote to doubleQuote
    see https://stackoverflow.com/a/50257217/1497139
    """
    singleQuoteRegex = re.compile("(?<!\\\\)'")

indent = 4 class-attribute instance-attribute

regular expression to be used for conversion from singleQuote to doubleQuote see https://stackoverflow.com/a/50257217/1497139

Types

Bases: JSONAble

Types.

holds entity meta Info

:ivar name(string): entity name = table name

Source code in lodentity/jsonable.py
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
class Types(JSONAble):
    """Types.

    holds entity meta Info

    :ivar name(string): entity name = table name
    """

    typeName2Type = {
        "bool": bool,
        "date": datetime.date,
        "datetime": datetime.datetime,
        "float": float,
        "int": int,
        "str": str,
    }

    def __init__(self, name: str, warnOnUnsupportedTypes=True, debug=False):
        """Constructor.

        Args:
            name(str): the name of the type map
            warnOnUnsupportedTypes(bool): if TRUE warn if an item value has an unsupported type
            debug(bool): if True - debugging information should be shown
        """
        self.name = name
        self.warnOnUnsupportedTypes = warnOnUnsupportedTypes
        self.debug = debug
        self.typeMap = {}

    @staticmethod
    def forTable(
        instance, listName: str, warnOnUnsupportedTypes: bool = True, debug=False
    ):
        """
        get the types for the list of Dicts (table) in the given instance with the given listName
        Args:
            instance(object): the instance to inspect
            listName(string): the list of dicts to inspect
            warnOnUnsupportedTypes(bool): if TRUE warn if an item value has an unsupported type
            debug(bool): True if debuggin information should be shown

        Returns:
            Types: a types object
        """
        clazz = type(instance)
        types = Types(
            clazz.__name__, warnOnUnsupportedTypes=warnOnUnsupportedTypes, debug=debug
        )
        types.getTypes(listName, instance.__dict__[listName])
        return types

    def addType(self, listName, field, valueType):
        """Add the python type for the given field to the typeMap.

        Args:
           listName(string): the name of the list of the field
           field(string): the name of the field

           valueType(type): the python type of the field
        """
        if listName not in self.typeMap:
            self.typeMap[listName] = {}
        typeMap = self.typeMap[listName]
        if not field in typeMap:
            typeMap[field] = valueType

    def getTypes(self, listName: str, sampleRecords: list, limit: int = 10):
        """Determine the types for the given sample records.

        Args:
            listName(str): the name of the list
            sampleRecords(list): a list of items
            limit(int): the maximum number of items to check
        """
        for sampleRecord in sampleRecords[:limit]:
            items = sampleRecord.items()
            self.getTypesForItems(listName, items, warnOnNone=len(sampleRecords) == 1)

    def getTypesForItems(self, listName: str, items: list, warnOnNone: bool = False):
        """Get the types for the given items side effect is setting my types.

        Args:
            listName(str): the name of the list
            items(list): a list of items
            warnOnNone(bool): if TRUE warn if an item value is None
        """
        for key, value in items:
            valueType = None
            if value is None:
                if warnOnNone and self.debug:
                    print(
                        f"Warning sampleRecord field {key} is None - using string as type"
                    )
                    valueType = str
            else:
                valueType = type(value)
            if valueType == str:
                pass
            elif valueType == int:
                pass
            elif valueType == float:
                pass
            elif valueType == bool:
                pass
            elif valueType == datetime.date:
                pass
            elif valueType == datetime.datetime:
                pass
            else:
                if valueType is not None:
                    msg = f"warning: unsupported type {str(valueType)} for field {key}"
                    if self.debug and self.warnOnUnsupportedTypes:
                        print(msg)
            if valueType is not None:
                self.addType(listName, key, valueType.__name__)

    def fixTypes(self, lod: list, listName: str):
        """Fix the types in the given data structure.

        Args:
            lod(list): a list of dicts
            listName(str): the types to lookup by list name
        """
        for listName in self.typeMap:
            self.fixListOfDicts(self.typeMap[listName], lod)

    def getType(self, typeName):
        """Get the type for the given type name."""
        if typeName in Types.typeName2Type:
            return Types.typeName2Type[typeName]
        else:
            if self.debug:
                print("Warning unsupported type %s" % typeName)
            return None

    def fixListOfDicts(self, typeMap, listOfDicts):
        """Fix the type in the given list of Dicts."""
        for record in listOfDicts:
            for keyValue in record.items():
                key, value = keyValue
                if value is None:
                    record[key] = None
                elif key in typeMap:
                    valueType = self.getType(typeMap[key])
                    if valueType == bool:
                        if type(value) == str:
                            b = value in ["True", "TRUE", "true"]
                        else:
                            b = value
                        record[key] = b
                    elif valueType == datetime.date:
                        dt = datetime.datetime.strptime(value, "%Y-%m-%d")
                        record[key] = dt.date()
                    elif valueType == datetime.datetime:
                        # see https://stackoverflow.com/questions/127803/how-do-i-parse-an-iso-8601-formatted-date
                        if isinstance(value, str):
                            if sys.version_info >= (3, 7):
                                dtime = datetime.datetime.fromisoformat(value)
                            else:
                                dtime = datetime.datetime.strptime(
                                    value, "%Y-%m-%dT%H:%M:%S.%f"
                                )
                        else:
                            # TODO: error handling
                            dtime = None
                        record[key] = dtime

__init__(name, warnOnUnsupportedTypes=True, debug=False)

Constructor.

Parameters:

Name Type Description Default
name(str)

the name of the type map

required
warnOnUnsupportedTypes(bool)

if TRUE warn if an item value has an unsupported type

required
debug(bool)

if True - debugging information should be shown

required
Source code in lodentity/jsonable.py
552
553
554
555
556
557
558
559
560
561
562
563
def __init__(self, name: str, warnOnUnsupportedTypes=True, debug=False):
    """Constructor.

    Args:
        name(str): the name of the type map
        warnOnUnsupportedTypes(bool): if TRUE warn if an item value has an unsupported type
        debug(bool): if True - debugging information should be shown
    """
    self.name = name
    self.warnOnUnsupportedTypes = warnOnUnsupportedTypes
    self.debug = debug
    self.typeMap = {}

addType(listName, field, valueType)

Add the python type for the given field to the typeMap.

Parameters:

Name Type Description Default
listName(string)

the name of the list of the field

required
field(string)

the name of the field

required
valueType(type)

the python type of the field

required
Source code in lodentity/jsonable.py
587
588
589
590
591
592
593
594
595
596
597
598
599
600
def addType(self, listName, field, valueType):
    """Add the python type for the given field to the typeMap.

    Args:
       listName(string): the name of the list of the field
       field(string): the name of the field

       valueType(type): the python type of the field
    """
    if listName not in self.typeMap:
        self.typeMap[listName] = {}
    typeMap = self.typeMap[listName]
    if not field in typeMap:
        typeMap[field] = valueType

fixListOfDicts(typeMap, listOfDicts)

Fix the type in the given list of Dicts.

Source code in lodentity/jsonable.py
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
def fixListOfDicts(self, typeMap, listOfDicts):
    """Fix the type in the given list of Dicts."""
    for record in listOfDicts:
        for keyValue in record.items():
            key, value = keyValue
            if value is None:
                record[key] = None
            elif key in typeMap:
                valueType = self.getType(typeMap[key])
                if valueType == bool:
                    if type(value) == str:
                        b = value in ["True", "TRUE", "true"]
                    else:
                        b = value
                    record[key] = b
                elif valueType == datetime.date:
                    dt = datetime.datetime.strptime(value, "%Y-%m-%d")
                    record[key] = dt.date()
                elif valueType == datetime.datetime:
                    # see https://stackoverflow.com/questions/127803/how-do-i-parse-an-iso-8601-formatted-date
                    if isinstance(value, str):
                        if sys.version_info >= (3, 7):
                            dtime = datetime.datetime.fromisoformat(value)
                        else:
                            dtime = datetime.datetime.strptime(
                                value, "%Y-%m-%dT%H:%M:%S.%f"
                            )
                    else:
                        # TODO: error handling
                        dtime = None
                    record[key] = dtime

fixTypes(lod, listName)

Fix the types in the given data structure.

Parameters:

Name Type Description Default
lod(list)

a list of dicts

required
listName(str)

the types to lookup by list name

required
Source code in lodentity/jsonable.py
652
653
654
655
656
657
658
659
660
def fixTypes(self, lod: list, listName: str):
    """Fix the types in the given data structure.

    Args:
        lod(list): a list of dicts
        listName(str): the types to lookup by list name
    """
    for listName in self.typeMap:
        self.fixListOfDicts(self.typeMap[listName], lod)

forTable(instance, listName, warnOnUnsupportedTypes=True, debug=False) staticmethod

get the types for the list of Dicts (table) in the given instance with the given listName Args: instance(object): the instance to inspect listName(string): the list of dicts to inspect warnOnUnsupportedTypes(bool): if TRUE warn if an item value has an unsupported type debug(bool): True if debuggin information should be shown

Returns:

Name Type Description
Types

a types object

Source code in lodentity/jsonable.py
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
@staticmethod
def forTable(
    instance, listName: str, warnOnUnsupportedTypes: bool = True, debug=False
):
    """
    get the types for the list of Dicts (table) in the given instance with the given listName
    Args:
        instance(object): the instance to inspect
        listName(string): the list of dicts to inspect
        warnOnUnsupportedTypes(bool): if TRUE warn if an item value has an unsupported type
        debug(bool): True if debuggin information should be shown

    Returns:
        Types: a types object
    """
    clazz = type(instance)
    types = Types(
        clazz.__name__, warnOnUnsupportedTypes=warnOnUnsupportedTypes, debug=debug
    )
    types.getTypes(listName, instance.__dict__[listName])
    return types

getType(typeName)

Get the type for the given type name.

Source code in lodentity/jsonable.py
662
663
664
665
666
667
668
669
def getType(self, typeName):
    """Get the type for the given type name."""
    if typeName in Types.typeName2Type:
        return Types.typeName2Type[typeName]
    else:
        if self.debug:
            print("Warning unsupported type %s" % typeName)
        return None

getTypes(listName, sampleRecords, limit=10)

Determine the types for the given sample records.

Parameters:

Name Type Description Default
listName(str)

the name of the list

required
sampleRecords(list)

a list of items

required
limit(int)

the maximum number of items to check

required
Source code in lodentity/jsonable.py
602
603
604
605
606
607
608
609
610
611
612
def getTypes(self, listName: str, sampleRecords: list, limit: int = 10):
    """Determine the types for the given sample records.

    Args:
        listName(str): the name of the list
        sampleRecords(list): a list of items
        limit(int): the maximum number of items to check
    """
    for sampleRecord in sampleRecords[:limit]:
        items = sampleRecord.items()
        self.getTypesForItems(listName, items, warnOnNone=len(sampleRecords) == 1)

getTypesForItems(listName, items, warnOnNone=False)

Get the types for the given items side effect is setting my types.

Parameters:

Name Type Description Default
listName(str)

the name of the list

required
items(list)

a list of items

required
warnOnNone(bool)

if TRUE warn if an item value is None

required
Source code in lodentity/jsonable.py
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
def getTypesForItems(self, listName: str, items: list, warnOnNone: bool = False):
    """Get the types for the given items side effect is setting my types.

    Args:
        listName(str): the name of the list
        items(list): a list of items
        warnOnNone(bool): if TRUE warn if an item value is None
    """
    for key, value in items:
        valueType = None
        if value is None:
            if warnOnNone and self.debug:
                print(
                    f"Warning sampleRecord field {key} is None - using string as type"
                )
                valueType = str
        else:
            valueType = type(value)
        if valueType == str:
            pass
        elif valueType == int:
            pass
        elif valueType == float:
            pass
        elif valueType == bool:
            pass
        elif valueType == datetime.date:
            pass
        elif valueType == datetime.datetime:
            pass
        else:
            if valueType is not None:
                msg = f"warning: unsupported type {str(valueType)} for field {key}"
                if self.debug and self.warnOnUnsupportedTypes:
                    print(msg)
        if valueType is not None:
            self.addType(listName, key, valueType.__name__)

jsonpicklemixin

JsonPickleMixin

Bases: object

Allow reading and writing derived objects from a jsonpickle file.

Source code in lodentity/jsonpicklemixin.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
class JsonPickleMixin(object):
    """Allow reading and writing derived objects from a jsonpickle file."""

    debug = False

    @staticmethod
    def checkExtension(jsonFile: str, extension: str = ".json") -> str:
        """Make sure the jsonFile has the given extension e.g. ".json".

        Args:
            jsonFile(str): the jsonFile name - potentially without ".json" suffix

        Returns:
            str: the jsonFile name with ".json" as an extension guaranteed
        """
        if not jsonFile.endswith(extension):
            jsonFile = f"{jsonFile}{extension}"
        return jsonFile

    # read me from a json pickle file
    @staticmethod
    def readJsonPickle(jsonFileName, extension=".jsonpickle"):
        """
        Args:
            jsonFileName(str): name of the file (optionally without ".json" postfix)
            extension(str): default file extension
        """
        jsonFileName = JsonPickleMixin.checkExtension(jsonFileName, extension)
        # is there a jsonFile for the given name
        if os.path.isfile(jsonFileName):
            if JsonPickleMixin.debug:
                print("reading %s" % (jsonFileName))
            with open(jsonFileName) as jsonFile:
                json = jsonFile.read()
            result = jsonpickle.decode(json)
            if JsonPickleMixin.debug:
                print(json)
                print(result)
            return result
        else:
            return None

    def asJsonPickle(self) -> str:
        """Convert me to JSON.

        Returns:
            str: a JSON String with my JSON representation
        """
        json = jsonpickle.encode(self)
        return json

    def writeJsonPickle(self, jsonFileName: str, extension: str = ".jsonpickle"):
        """Write me to the json file with the given name (optionally without
        postfix)

        Args:
            jsonFileName(str): name of the file (optionally without ".json" postfix)
            extension(str): default file extension
        """
        jsonFileName = JsonPickleMixin.checkExtension(jsonFileName, extension)
        json = self.asJsonPickle()
        if JsonPickleMixin.debug:
            print("writing %s" % (jsonFileName))
            print(json)
            print(self)
        jsonFile = open(jsonFileName, "w")
        jsonFile.write(json)
        jsonFile.close()

asJsonPickle()

Convert me to JSON.

Returns:

Name Type Description
str str

a JSON String with my JSON representation

Source code in lodentity/jsonpicklemixin.py
49
50
51
52
53
54
55
56
def asJsonPickle(self) -> str:
    """Convert me to JSON.

    Returns:
        str: a JSON String with my JSON representation
    """
    json = jsonpickle.encode(self)
    return json

checkExtension(jsonFile, extension='.json') staticmethod

Make sure the jsonFile has the given extension e.g. ".json".

Parameters:

Name Type Description Default
jsonFile(str)

the jsonFile name - potentially without ".json" suffix

required

Returns:

Name Type Description
str str

the jsonFile name with ".json" as an extension guaranteed

Source code in lodentity/jsonpicklemixin.py
12
13
14
15
16
17
18
19
20
21
22
23
24
@staticmethod
def checkExtension(jsonFile: str, extension: str = ".json") -> str:
    """Make sure the jsonFile has the given extension e.g. ".json".

    Args:
        jsonFile(str): the jsonFile name - potentially without ".json" suffix

    Returns:
        str: the jsonFile name with ".json" as an extension guaranteed
    """
    if not jsonFile.endswith(extension):
        jsonFile = f"{jsonFile}{extension}"
    return jsonFile

readJsonPickle(jsonFileName, extension='.jsonpickle') staticmethod

Parameters:

Name Type Description Default
jsonFileName(str)

name of the file (optionally without ".json" postfix)

required
extension(str)

default file extension

required
Source code in lodentity/jsonpicklemixin.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@staticmethod
def readJsonPickle(jsonFileName, extension=".jsonpickle"):
    """
    Args:
        jsonFileName(str): name of the file (optionally without ".json" postfix)
        extension(str): default file extension
    """
    jsonFileName = JsonPickleMixin.checkExtension(jsonFileName, extension)
    # is there a jsonFile for the given name
    if os.path.isfile(jsonFileName):
        if JsonPickleMixin.debug:
            print("reading %s" % (jsonFileName))
        with open(jsonFileName) as jsonFile:
            json = jsonFile.read()
        result = jsonpickle.decode(json)
        if JsonPickleMixin.debug:
            print(json)
            print(result)
        return result
    else:
        return None

writeJsonPickle(jsonFileName, extension='.jsonpickle')

Write me to the json file with the given name (optionally without postfix)

Parameters:

Name Type Description Default
jsonFileName(str)

name of the file (optionally without ".json" postfix)

required
extension(str)

default file extension

required
Source code in lodentity/jsonpicklemixin.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def writeJsonPickle(self, jsonFileName: str, extension: str = ".jsonpickle"):
    """Write me to the json file with the given name (optionally without
    postfix)

    Args:
        jsonFileName(str): name of the file (optionally without ".json" postfix)
        extension(str): default file extension
    """
    jsonFileName = JsonPickleMixin.checkExtension(jsonFileName, extension)
    json = self.asJsonPickle()
    if JsonPickleMixin.debug:
        print("writing %s" % (jsonFileName))
        print(json)
        print(self)
    jsonFile = open(jsonFileName, "w")
    jsonFile.write(json)
    jsonFile.close()