Skip to content

API Documentation

github

A script to download files from GitHub, preserving the original commit dates.

Created on 2023-12-12

@author: wf

Usage: python github.py ...

GithubDownloader

Downloader for github files with timestamp preservation.

Source code in thunderbird/github.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
class GithubDownloader:
    """Downloader for github files with timestamp preservation."""

    def __init__(self, target_dir: str, urls: List[str]) -> None:
        """
        Initializes the GithubDownloader with a target directory and a dictionary mapping URLs to repositories.

        Args:
            target_dir: The directory where files will be downloaded.
            urls: A dictionary of URLs to their corresponding repositories.
        """
        self.target_dir = target_dir
        os.makedirs(target_dir, exist_ok=True)
        self.file_map = {}
        for url in urls:
            self.file_map[url] = GithubFile(url)

    def download_files(self) -> None:
        """Downloads all files based on the prepared download gh_fileurations."""
        for _url, gh_file in self.file_map.items():
            self.download_file(gh_file)

    def download_file(self, gh_file: GithubFile) -> None:
        """
        Downloads a file based on the provided download gh_fileuration.

        Args:
            gh_file(GithubFile): the download url, repo,filepath etc
        """
        response = requests.get(gh_file.raw_url)
        checkmark = "✅" if response.status_code == 200 else "❌"
        status_str = "successful" if response.status_code == 200 else "failed"
        gh_file.add_msg(f"{checkmark} Download {status_str}")

        if response.status_code == 200:
            local_path = os.path.join(self.target_dir, gh_file.file_path)
            # Create parent directory if it doesn't exist
            os.makedirs(os.path.dirname(local_path), exist_ok=True)

            with open(local_path, "wb") as file:
                file.write(response.content)
            gh_file.get_commit_date()
            gh_file.set_file_timestamp(self.target_dir)

    def set_file_timestamp(self, filepath: str, timestamp: str) -> None:
        """Sets the file's timestamp.

        Args:
            filepath: The path to the local file.
            timestamp: The timestamp string in ISO format.
        """
        timestamp = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%SZ")
        mod_time = timestamp.timestamp()
        os.utime(filepath, (mod_time, mod_time))

    @classmethod
    def from_args(cls, args: list) -> "GithubDownloader":
        """
        Creates an instance of GithubDownloader from command line arguments.

        Args:
            args: A list of command line arguments.

        Returns:
            An instance of GithubDownloader.

        Raises:
            ValueError: If insufficient arguments are provided.
        """
        if len(args) < 2:
            raise ValueError("Insufficient arguments. Expected at least 2 arguments.")
        target_dir = args[0]
        urls = args[1:]
        return cls(target_dir, urls)

__init__(target_dir, urls)

Initializes the GithubDownloader with a target directory and a dictionary mapping URLs to repositories.

Parameters:

Name Type Description Default
target_dir str

The directory where files will be downloaded.

required
urls List[str]

A dictionary of URLs to their corresponding repositories.

required
Source code in thunderbird/github.py
104
105
106
107
108
109
110
111
112
113
114
115
116
def __init__(self, target_dir: str, urls: List[str]) -> None:
    """
    Initializes the GithubDownloader with a target directory and a dictionary mapping URLs to repositories.

    Args:
        target_dir: The directory where files will be downloaded.
        urls: A dictionary of URLs to their corresponding repositories.
    """
    self.target_dir = target_dir
    os.makedirs(target_dir, exist_ok=True)
    self.file_map = {}
    for url in urls:
        self.file_map[url] = GithubFile(url)

download_file(gh_file)

Downloads a file based on the provided download gh_fileuration.

Parameters:

Name Type Description Default
gh_file(GithubFile)

the download url, repo,filepath etc

required
Source code in thunderbird/github.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def download_file(self, gh_file: GithubFile) -> None:
    """
    Downloads a file based on the provided download gh_fileuration.

    Args:
        gh_file(GithubFile): the download url, repo,filepath etc
    """
    response = requests.get(gh_file.raw_url)
    checkmark = "✅" if response.status_code == 200 else "❌"
    status_str = "successful" if response.status_code == 200 else "failed"
    gh_file.add_msg(f"{checkmark} Download {status_str}")

    if response.status_code == 200:
        local_path = os.path.join(self.target_dir, gh_file.file_path)
        # Create parent directory if it doesn't exist
        os.makedirs(os.path.dirname(local_path), exist_ok=True)

        with open(local_path, "wb") as file:
            file.write(response.content)
        gh_file.get_commit_date()
        gh_file.set_file_timestamp(self.target_dir)

download_files()

Downloads all files based on the prepared download gh_fileurations.

Source code in thunderbird/github.py
118
119
120
121
def download_files(self) -> None:
    """Downloads all files based on the prepared download gh_fileurations."""
    for _url, gh_file in self.file_map.items():
        self.download_file(gh_file)

from_args(args) classmethod

Creates an instance of GithubDownloader from command line arguments.

Parameters:

Name Type Description Default
args list

A list of command line arguments.

required

Returns:

Type Description
GithubDownloader

An instance of GithubDownloader.

Raises:

Type Description
ValueError

If insufficient arguments are provided.

Source code in thunderbird/github.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
@classmethod
def from_args(cls, args: list) -> "GithubDownloader":
    """
    Creates an instance of GithubDownloader from command line arguments.

    Args:
        args: A list of command line arguments.

    Returns:
        An instance of GithubDownloader.

    Raises:
        ValueError: If insufficient arguments are provided.
    """
    if len(args) < 2:
        raise ValueError("Insufficient arguments. Expected at least 2 arguments.")
    target_dir = args[0]
    urls = args[1:]
    return cls(target_dir, urls)

set_file_timestamp(filepath, timestamp)

Sets the file's timestamp.

Parameters:

Name Type Description Default
filepath str

The path to the local file.

required
timestamp str

The timestamp string in ISO format.

required
Source code in thunderbird/github.py
145
146
147
148
149
150
151
152
153
154
def set_file_timestamp(self, filepath: str, timestamp: str) -> None:
    """Sets the file's timestamp.

    Args:
        filepath: The path to the local file.
        timestamp: The timestamp string in ISO format.
    """
    timestamp = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%SZ")
    mod_time = timestamp.timestamp()
    os.utime(filepath, (mod_time, mod_time))

GithubFile dataclass

Data class to hold the details for each github file.

Attributes:

Name Type Description
url str

The URL to the file.

repo str

The repository name.

file_path str

The relative path of the file.

status_msg Optional[str]

A status message regarding the download.

timestamp Optional[datetime]

The timestamp of the last commit.

timestamp_iso Optional[str]

timestamp in iso format

Source code in thunderbird/github.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
@dataclass
class GithubFile:
    """
    Data class to hold the details for each github file.

    Attributes:
        url (str): The URL to the file.
        repo (str): The repository name.
        file_path (str): The relative path of the file.
        status_msg (Optional[str]): A status message regarding the download.
        timestamp (Optional[datetime]): The timestamp of the last commit.
        timestamp_iso (Optional[str]): timestamp in iso format
    """

    url: str
    repo: str = field(init=False)
    branch: str = field(init=False)
    file_path: str = field(init=False)
    raw_url: str = field(init=False)
    status_msg: Optional[str] = None
    timestamp: Optional[datetime] = None
    timestamp_iso: Optional[str] = None  # ISO format timestamp

    def __post_init__(self):
        """
        Post-initialization to parse the URL and construct the raw URL for downloading the file.
        """
        pattern = r"https://github\.com/(?P<repo>[^/]+/[^/]+)/blob/(?P<branch>[^/]+)/(?P<file_path>.+)"
        match = re.match(pattern, self.url)
        if match:
            self.repo = match.group("repo")
            self.branch = match.group("branch")
            self.file_path = match.group("file_path")
            self.raw_url = f"https://raw.githubusercontent.com/{self.repo}/{self.branch}/{self.file_path}"
        else:
            raise ValueError(f"URL does not match expected format: {self.url}")

    def to_json(self):
        """Converts the me to JSON."""
        return json.dumps(asdict(self), default=str)

    def get_commit_date(self) -> Optional[str]:
        """
        Gets the last commit date of my file from GitHub
        and sets my timestamp accordingly

        """
        api_url = (
            f"https://api.github.com/repos/{self.repo}/commits?path={self.file_path}"
        )
        response = requests.get(api_url)
        if response.status_code == 200:
            commits = response.json()
            commit = commits[0]["commit"]["committer"]["date"]
            self.timestamp = datetime.strptime(commit, "%Y-%m-%dT%H:%M:%SZ")
            self.timestamp_iso = self.timestamp.isoformat()
        else:
            msg = f"can't access commit date for {self.repo}:{self.file_path}"
            self.add_msg(msg)
            self.timestamp = None

    def set_file_timestamp(self, target_dir: str):
        """Sets the file's timestamp based on the commit date."""
        if self.timestamp and isinstance(self.timestamp, datetime):
            mod_time = self.timestamp.timestamp()
            local_path = os.path.join(target_dir, self.file_path)
            if os.path.isfile(local_path):
                os.utime(local_path, (mod_time, mod_time))

    def add_msg(self, msg):
        if self.status_msg is None:
            self.status_msg = ""
            delim = ""
        else:
            delim = "\n"
        self.status_msg = f"{self.status_msg}{delim}{msg}"

__post_init__()

Post-initialization to parse the URL and construct the raw URL for downloading the file.

Source code in thunderbird/github.py
46
47
48
49
50
51
52
53
54
55
56
57
58
def __post_init__(self):
    """
    Post-initialization to parse the URL and construct the raw URL for downloading the file.
    """
    pattern = r"https://github\.com/(?P<repo>[^/]+/[^/]+)/blob/(?P<branch>[^/]+)/(?P<file_path>.+)"
    match = re.match(pattern, self.url)
    if match:
        self.repo = match.group("repo")
        self.branch = match.group("branch")
        self.file_path = match.group("file_path")
        self.raw_url = f"https://raw.githubusercontent.com/{self.repo}/{self.branch}/{self.file_path}"
    else:
        raise ValueError(f"URL does not match expected format: {self.url}")

get_commit_date()

Gets the last commit date of my file from GitHub and sets my timestamp accordingly

Source code in thunderbird/github.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def get_commit_date(self) -> Optional[str]:
    """
    Gets the last commit date of my file from GitHub
    and sets my timestamp accordingly

    """
    api_url = (
        f"https://api.github.com/repos/{self.repo}/commits?path={self.file_path}"
    )
    response = requests.get(api_url)
    if response.status_code == 200:
        commits = response.json()
        commit = commits[0]["commit"]["committer"]["date"]
        self.timestamp = datetime.strptime(commit, "%Y-%m-%dT%H:%M:%SZ")
        self.timestamp_iso = self.timestamp.isoformat()
    else:
        msg = f"can't access commit date for {self.repo}:{self.file_path}"
        self.add_msg(msg)
        self.timestamp = None

set_file_timestamp(target_dir)

Sets the file's timestamp based on the commit date.

Source code in thunderbird/github.py
84
85
86
87
88
89
90
def set_file_timestamp(self, target_dir: str):
    """Sets the file's timestamp based on the commit date."""
    if self.timestamp and isinstance(self.timestamp, datetime):
        mod_time = self.timestamp.timestamp()
        local_path = os.path.join(target_dir, self.file_path)
        if os.path.isfile(local_path):
            os.utime(local_path, (mod_time, mod_time))

to_json()

Converts the me to JSON.

Source code in thunderbird/github.py
60
61
62
def to_json(self):
    """Converts the me to JSON."""
    return json.dumps(asdict(self), default=str)

main()

Main function to run the script.

Source code in thunderbird/github.py
177
178
179
180
181
182
def main():
    """
    Main function to run the script.
    """
    downloader = GithubDownloader.from_args(sys.argv[1:])
    downloader.download_files()

mail

Created on 2020-10-24

@author: wf

IndexingState dataclass

State of the index_db for all mailboxes

Source code in thunderbird/mail.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
@dataclass
class IndexingState:
    """
    State of the index_db for all mailboxes
    """
    total_mailboxes: int = 0
    total_successes: int = 0
    total_errors: int = 0
    force_create: bool = False
    index_up_to_date = False
    success: Counter = field(default_factory=Counter) 
    errors: Dict[str, Exception] = field(default_factory=dict)
    gloda_db_update_time: Optional[datetime] = None
    index_db_update_time: Optional[datetime] = None
    msg: str = ""

    def update_msg(self):
        msg=f"{self.total_successes}/{self.total_mailboxes} updated - {self.total_errors} errors"
        return msg

    @property
    def error_rate(self):
        """
        Calculate the error rate in percent
        """
        if self.total_mailboxes > 0:
            error_rate = self.total_errors / self.total_mailboxes *100
        else:
            error_rate=0.0
        return error_rate

    def show_index_report(self, verbose: bool=False, with_print:bool=True)->str:
        """
        Displays a report on the indexing results of email mailboxes.

        Args:
            indexing_result (IndexingResult): The result of the indexing process containing detailed results.
            verbose (bool): If True, displays detailed error and success messages.
            with_print(bool): if True - actually print out the index report
        Returns:
            an indexing report message
        """
        report=""
        if with_print:
            print(self.msg)
            report=self.msg
        if verbose:
            # Detailed error messages
            if self.errors:
                err_msg = "Errors occurred during index creation:\n"
                for path, error in self.errors.items():
                    err_msg += f"Error in {path}: {error}\n"
                if with_print:
                    print(err_msg, file=sys.stderr)
                report+="\n"+err_msg

            # Detailed success messages
            if self.success:
                success_msg = "Index created successfully for:\n"
                for path, count in self.success.items():
                    success_msg += f"{path}: {count} entries\n"
                if with_print:
                    print(success_msg)
                report+="\n"+success_msg

        # Summary message
        total_errors = len(self.errors)
        total_successes = sum(self.success.values())
        average_success = (
            (total_successes / len(self.success))
            if self.success
            else 0
        )
        error_rate = (
            (total_errors / self.total_mailboxes) * 100
            if self.total_mailboxes > 0
            else 0
        )

        marker = "❌ " if total_errors > 0 else "✅"
        summary_msg = (
            f"Indexing completed: {marker}Total indexed messages: {total_successes}, "
            f"Average messages per successful mailbox: {average_success:.2f}, "
            f"{total_errors} mailboxes with errors ({error_rate:.2f}%)."
        )
        msg_channel = sys.stderr if total_errors > 0 else sys.stdout
        if self.total_mailboxes > 0:
            print(summary_msg, file=msg_channel)
            report+="\n"+summary_msg
        return report

    def get_update_lod(self):
        """
        get a list of dict of update records
        """
        update_lod = []
        for i,mailbox in enumerate(self.mailboxes_to_update.values()):
            mb_record=mailbox.as_view_record(index=i+1) 
            update_lod.append(mb_record)
        return update_lod

error_rate property

Calculate the error rate in percent

get_update_lod()

get a list of dict of update records

Source code in thunderbird/mail.py
217
218
219
220
221
222
223
224
225
def get_update_lod(self):
    """
    get a list of dict of update records
    """
    update_lod = []
    for i,mailbox in enumerate(self.mailboxes_to_update.values()):
        mb_record=mailbox.as_view_record(index=i+1) 
        update_lod.append(mb_record)
    return update_lod

show_index_report(verbose=False, with_print=True)

Displays a report on the indexing results of email mailboxes.

Parameters:

Name Type Description Default
indexing_result IndexingResult

The result of the indexing process containing detailed results.

required
verbose bool

If True, displays detailed error and success messages.

False
with_print(bool)

if True - actually print out the index report

required

Returns: an indexing report message

Source code in thunderbird/mail.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def show_index_report(self, verbose: bool=False, with_print:bool=True)->str:
    """
    Displays a report on the indexing results of email mailboxes.

    Args:
        indexing_result (IndexingResult): The result of the indexing process containing detailed results.
        verbose (bool): If True, displays detailed error and success messages.
        with_print(bool): if True - actually print out the index report
    Returns:
        an indexing report message
    """
    report=""
    if with_print:
        print(self.msg)
        report=self.msg
    if verbose:
        # Detailed error messages
        if self.errors:
            err_msg = "Errors occurred during index creation:\n"
            for path, error in self.errors.items():
                err_msg += f"Error in {path}: {error}\n"
            if with_print:
                print(err_msg, file=sys.stderr)
            report+="\n"+err_msg

        # Detailed success messages
        if self.success:
            success_msg = "Index created successfully for:\n"
            for path, count in self.success.items():
                success_msg += f"{path}: {count} entries\n"
            if with_print:
                print(success_msg)
            report+="\n"+success_msg

    # Summary message
    total_errors = len(self.errors)
    total_successes = sum(self.success.values())
    average_success = (
        (total_successes / len(self.success))
        if self.success
        else 0
    )
    error_rate = (
        (total_errors / self.total_mailboxes) * 100
        if self.total_mailboxes > 0
        else 0
    )

    marker = "❌ " if total_errors > 0 else "✅"
    summary_msg = (
        f"Indexing completed: {marker}Total indexed messages: {total_successes}, "
        f"Average messages per successful mailbox: {average_success:.2f}, "
        f"{total_errors} mailboxes with errors ({error_rate:.2f}%)."
    )
    msg_channel = sys.stderr if total_errors > 0 else sys.stdout
    if self.total_mailboxes > 0:
        print(summary_msg, file=msg_channel)
        report+="\n"+summary_msg
    return report

Mail

Bases: object

a single mail

Source code in thunderbird/mail.py
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
class Mail(object):
    """
    a single mail
    """

    def __init__(self, user, mailid, tb=None, debug=False, keySearch=True):
        """
        Constructor

        Args:
            user(string): userid of the user
            mailid(string): unique id of the mail
            debug(bool): True if debugging should be activated
            keySearch(bool): True if a slow keySearch should be tried when lookup fails
        """
        self.debug = debug
        self.user = user
        if tb is None:
            self.tb = Thunderbird.get(user)
        else:
            self.tb = tb
        mailid = Mail.normalize_mailid(mailid)
        self.mailid = mailid
        self.keySearch = keySearch
        self.rawMsg = None
        self.msg = None
        self.headers = {}
        self.fromUrl = None
        self.fromMailTo = None
        self.toUrl = None
        self.toMailTo = None
        mail_record = self.search()
        if mail_record is not None:
            mail_lookup = MailLookup.from_mail_record(mail_record)
            self.folder_path = mail_lookup.folder_path
            folderPath = self.tb.local_folders + mail_lookup.folder_path
            tb_mbox = ThunderbirdMailbox(self.tb, folderPath, debug=self.debug)
            found=False
            if mail_lookup.start_pos is not None and mail_lookup.stop_pos is not None:
                self.msg = tb_mbox.get_message_by_pos(mail_lookup.start_pos, mail_lookup.stop_pos)
                found = self.check_mailid()
            if not found:
                # Fallback to other methods if start_pos and stop_pos are not available
                self.msg = tb_mbox.get_message_by_key(mail_lookup.message_index)
            # if lookup fails we might loop thru
            # all messages if this option is active ...
            found = self.check_mailid()
            if not found and self.keySearch:
                self.msg = tb_mbox.search_message_by_key(self.mailid)
            if self.msg is not None:
                if self.check_mailid():
                    self.extract_message()
                else:
                    self.msg = None
            tb_mbox.close()

    def check_mailid(self) -> bool:
        """
        check the mailid
        """
        found = False
        self.extract_headers()
        # workaround awkward mail ID handling 
        # headers should be case insensitive but in reality they might be not
        # if any message-id fits the self.mailid we'll consider the mail as found
        id_headers = ["Message-ID","Message-Id"]
        for id_header in id_headers:
            if id_header in self.headers:
                header_id = self.headers[id_header]
                header_id = self.normalize_mailid(header_id)
                if header_id == self.mailid:
                    found=True
        return found

    @classmethod
    def get_iso_date(cls, msg) -> Tuple[str, Optional[str], Optional[str]]:
        """
        Extracts and formats the date from the email header in ISO format.

        Args:
            msg (Mail): The mail object from which to extract the date.

        Returns:
            Tuple[str, Optional[str], Optional[str]]: A tuple containing the msg_date, the formatted date in ISO format,
            and an error message if the date cannot be extracted or parsed, otherwise None.
        """
        date_parser = DateParser()
        msg_date = msg.get("Date", "")
        iso_date = "?"
        error_msg = None
        if msg_date:
            try:
                iso_date = date_parser.parse_date(msg_date)
            except Exception as e:
                error_msg = f"Error parsing date '{msg_date}': {e}"
        return msg_date, iso_date, error_msg

    @classmethod
    def normalize_mailid(cls, mail_id: str) -> str:
        """
        remove the surrounding <> of the given mail_id
        """
        mail_id = re.sub(r"\<(.*)\>", r"\1", mail_id)
        return mail_id

    def as_html_error_msg(self) -> str:
        """
        Generates an HTML formatted error message for the Mail instance.

        This method should be called when a Mail object is requested but not found.
        It uses the `normalize_mailid` method to format the mail ID in the error message.

        Returns:
            str: An HTML string representing the error message.
        """
        normalized_mailid = Mail.normalize_mailid(self.mailid)
        html_error_msg = f"<span style='color: red;'>Mail with id {normalized_mailid} not found</span>"
        return html_error_msg

    def extract_headers(self):
        """
        update the headers
        """
        if not self.msg:
            self.headers = {}
        else:
            for key in self.msg.keys():
                # https://stackoverflow.com/a/21715870/1497139
                self.headers[key] = str(make_header(decode_header(self.msg.get(key))))

    def extract_message(self, lenient: bool = False) -> None:
        """
        Extracts the message body and headers from the email message.

        This method decodes each part of the email message, handling different content types and charsets. It appends
        the decoded text to the message object's text and HTML attributes.

        Args:
            lenient (bool): If True, the method will not raise an exception for decoding errors, and will instead skip the problematic parts.

        """
        if len(self.headers) == 0:
            self.extract_headers()
        self.txtMsg = ""
        self.html = ""
        # https://stackoverflow.com/a/43833186/1497139
        self.msgParts = []
        # decode parts
        # https://stackoverflow.com/questions/59554237/how-to-handle-all-charset-and-content-type-when-reading-email-from-imap-lib-in-p
        # https://gist.github.com/miohtama/5389146
        for part in self.msg.walk():
            self.msgParts.append(part)
            part.length = len(part._payload)
            # each part is a either non-multipart, or another multipart message
            # that contains further parts... Message is organized like a tree
            contentType = part.get_content_type()
            charset = part.get_content_charset()
            if charset is None:
                charset = "utf-8"
            partname = part.get_param("name")
            part.filename = self.fixedPartName(
                partname, contentType, len(self.msgParts)
            )
            if contentType == "text/plain" or contentType == "text/html":
                part_str = part.get_payload(decode=1)
                rawPart = self.try_decode(part_str, charset, lenient)
                if rawPart is not None:
                    if contentType == "text/plain":
                        self.txtMsg += rawPart
                    elif contentType == "text/html":
                        self.html += rawPart
            pass
        self.handle_headers()

    def try_decode(self, byte_str: bytes, charset: str, lenient: bool) -> str:
        """
        Attempts to decode a byte string using multiple charsets.

        Tries to decode the byte string using a series of common charsets, returning the decoded string upon success.
        If all attempts fail, it either raises a UnicodeDecodeError or returns None, based on the lenient flag.

        Args:
            byte_str (bytes): The byte string to be decoded.
            charset (str): The initial charset to attempt decoding with.
            lenient (bool): If True, suppresses UnicodeDecodeError and returns None for undecodable byte strings.

        Returns:
            str: The decoded string, or None if lenient is True and decoding fails.

        Raises:
            UnicodeDecodeError: If decoding fails and lenient is False.

        """
        charsets_to_try = [charset, "utf-8", "iso-8859-1", "ascii"]
        # Ensure no duplicate charsets in the list
        unique_charsets = list(dict.fromkeys(charsets_to_try))

        for encoding in unique_charsets:
            try:
                decoded = byte_str.decode(encoding)
                return decoded
            except UnicodeDecodeError:
                continue

        if not lenient:
            raise UnicodeDecodeError(
                f"Failed to decode with charsets: {unique_charsets}"
            )
        return None

    def handle_headers(self):
        # sort the headers
        self.headers = OrderedDict(sorted(self.headers.items()))
        if "From" in self.headers:
            fromAdr = self.headers["From"]
            self.fromMailTo = f"mailto:{fromAdr}"
            self.fromUrl = f"<a href='{self.fromMailTo}'>{fromAdr}</a>"
        if "To" in self.headers:
            toAdr = self.headers["To"]
            self.toMailTo = f"mailto:{toAdr}"
            self.toUrl = f"<a href='{self.toMailTo}'>{toAdr}</a>"
        pass

    def search(self, use_index_db: bool = True) -> Optional[Dict[str, Any]]:
        """
        Search for an email by its ID in the specified Thunderbird mailbox database.

        This method allows searching either the gloda database or the index database based on the `use_index_db` parameter.
        It returns a dictionary representing the found email or None if not found.

        Args:
            use_index_db (bool): If True, the search will be performed in the index database.
                                 If False, the search will be performed in the gloda database (default).

        Returns:
            Optional[Dict[str, Any]]: A dictionary representing the found email, or None if not found.
        """
        if self.debug:
            print(f"Searching for mail with id {self.mailid} for user {self.user}")

        if use_index_db and self.tb.index_db_exists():
            # Query for the index database
            query = """SELECT * FROM mail_index 
                       WHERE message_id = ?"""
            source = "index_db"
            params = (f"<{self.mailid}>",)
        else:
            # Query for the gloda database
            query = """SELECT m.*, f.* 
                       FROM messages m JOIN
                            folderLocations f ON m.folderId = f.id
                       WHERE m.headerMessageID = (?)"""
            source = "gloda"
            params = (self.mailid,)

        db = (
            self.tb.index_db
            if use_index_db and self.tb.index_db_exists()
            else self.tb.sqlDB
        )
        maillookup = db.query(query, params)

        if self.debug:
            print(maillookup)

        # Store the result in a variable before returning
        mail_record = maillookup[0] if maillookup else None
        if mail_record:
            mail_record["source"] = source
            if not "message_id" in mail_record:
                mail_record["message_id"] = self.mailid
        return mail_record

    def fixedPartName(self, partname: str, contentType: str, partIndex: int):
        """
        get a fixed version of the partname


        Args:
            partname(str): the name of the part
            defaultName(str): the default name to use
        """

        # avoid TypeError: expected string or bytes-like object
        if partname:
            if type(partname) is tuple:
                _encoding, _unknown, partname = partname
            filename = str(make_header(decode_header(partname)))
        else:
            ext = guess_extension(contentType.partition(";")[0].strip())
            if ext is None:
                ext = ".txt"
            filename = f"part{partIndex}{ext}"
        filename = fix_text(filename)
        return filename

    def __str__(self):
        text = f"{self.user}/{self.mailid}"
        return text

    def getHeader(self, headerName: str):
        """
        get the header with the given name

        Args:
            headerName(str): the name of the header

        Returns:
            str: the header value
        """
        if headerName in self.headers:
            headerValue = self.headers[headerName]
        else:
            headerValue = "?"
        return headerValue

    def asWikiMarkup(self) -> str:
        """
        convert me to wiki markup in Wikison notation

        Returns:
            str: a http://wiki.bitplan.com/index.php/WikiSon notation
        """
        if len(self.headers) == 0:
            self.extract_headers()
        _msg_date, iso_date, _error_msg = Mail.get_iso_date(self.msg)
        wikison = f"""{{{{mail
|user={self.user}
|id={self.mailid}
|from={self.getHeader('From')}
|to={self.getHeader('To')}
|subject={self.getHeader('Subject')}
|date={iso_date}
}}}}"""
        return wikison

    def table_line(self, key, value):
        """Generate a table row with a key and value."""
        return f"<tr><th>{key}:</th><td>{value}</td><tr>"

    def mail_part_row(self, loop_index: int, part):
        """Generate a table row for a mail part."""
        # Check if loop_index is 0 to add a header
        header = ""
        if self.mailid:
            mailid = self.mailid.replace(">", "").replace("<", "")
        else:
            mailid = "unknown-mailid"
        if loop_index == 0:
            header = "<tr><th>#</th><th>Content Type</th><th>Charset</th><th>Filename</th><th style='text-align:right'>Length</th></tr>"
        link = Link.create(f"/part/{self.user}/{mailid}/{loop_index}", part.filename)
        # Generate the row for the current part
        row = f"<tr><th>{loop_index+1}:</th><td>{part.get_content_type()}</td><td>{part.get_content_charset()}</td><td>{link}</a></td><td style='text-align:right'>{part.length}</td><tr>"
        return header + row

    def as_html_section(self, section_name):
        """
        convert my content to the given html section

        Args:
            section_name(str): the name of the section to create
        """
        html_parts = []
        # Start building the HTML string
        table_sections = ["info", "parts", "headers"]
        if section_name in table_sections:
            html_parts.append("<hr>")
            html_parts.append(f"<table id='{section_name}Table'>")
        if section_name == "title":
            if self.mailid:
                html_parts.append(f"<h2>{self.mailid}</h2>")
        elif section_name == "wiki":
            html_parts.append(f"<hr><pre>{self.asWikiMarkup()}</pre>")
        elif section_name == "info":
            html_parts.append(self.table_line("User", self.user))
            html_parts.append(self.table_line("Folder", self.folder_path))
            html_parts.append(self.table_line("From", self.fromUrl))
            html_parts.append(self.table_line("To", self.toUrl))
            html_parts.append(self.table_line("Date", self.getHeader("Date")))
            html_parts.append(self.table_line("Subject", self.getHeader("Subject")))
            html_parts.append(
                self.table_line("Message-ID", self.getHeader("Message-ID"))
            )
        elif section_name == "headers":
            for key, value in self.headers.items():
                html_parts.append(self.table_line(key, value))
        # Closing t
        elif section_name == "parts":
            for index, part in enumerate(self.msgParts):
                html_parts.append(self.mail_part_row(index, part))
        elif section_name == "text":
            # Add raw message parts if necessary
            html_parts.append(f"<hr><p id='txtMsg'>{self.txtMsg}</p>")
        elif section_name == "html":
            html_parts.append(f"<hr><div id='htmlMsg'>{self.html}</div>")
        if section_name in table_sections:
            # Closing tables
            html_parts.append("</table>")
        markup = "".join(html_parts)
        return markup

    def as_html(self):
        """Generate the HTML representation of the mail."""
        html = ""
        for section_name in ["title", "info", "parts", "text", "html"]:
            html += self.as_html_section(section_name)
        return html

    def part_as_fileresponse(
        self, part_index: int, attachments_path: str = None
    ) -> Any:
        """
        Return the specified part of a message as a FileResponse.

        Args:
            part_index (int): The index of the part to be returned.

        Returns:
            FileResponse: A FileResponse object representing the specified part.

        Raises:
            IndexError: If the part_index is out of range of the message parts.
            ValueError: If the part content is not decodable.

        Note:
            The method assumes that self.msgParts is a list-like container holding the message parts.
            Since FastAPI's FileResponse is designed to work with file paths, this function writes the content to a temporary file.
        """
        # Check if part_index is within the range of msgParts
        if not 0 <= part_index < len(self.msgParts):
            raise IndexError("part_index out of range.")

        # Get the specific part from the msgParts
        part = self.msgParts[part_index]

        # Get the content of the part, decode if necessary
        try:
            content = part.get_payload(decode=True)
        except:
            raise ValueError("Unable to decode part content.")

        # Write content to a temporary file
        with tempfile.NamedTemporaryFile(delete=False) as temp_file:
            temp_file_name = temp_file.name
            temp_file.write(content)

        # Create and return a FileResponse object
        file_response = FileResponse(
            path=temp_file_name, filename=part.get_filename() or "file"
        )

        # Delete the temporary file after sending the response
        async def on_send_response() -> None:
            os.unlink(temp_file_name)

        file_response.background = on_send_response

        # response=StreamingResponse(io.BytesIO(content),media_type=)
        return file_response

    @staticmethod
    def toSbdFolder(folderURI):
        """
        get the SBD folder for the given folderURI as a tuple

        Args:
            folderURI(str): the folder uri
        Returns:
            sbdFolder(str): the prefix
            folder(str): the local path
        """
        folder = folderURI.replace("mailbox://nobody@", "")
        # https://stackoverflow.com/a/14007559/1497139
        parts = folder.split("/")
        sbdFolder = "/Mail/"
        folder = ""
        for i, part in enumerate(parts):
            if i == 0:  # e.g. "Local Folders" ...
                sbdFolder += f"{part}/"
            elif i < len(parts) - 1:
                sbdFolder += f"{part}.sbd/"
                folder += f"{part}/"
            else:
                sbdFolder += f"{part}"
                folder += f"{part}"
        return sbdFolder, folder

    @staticmethod
    def create_message(frm, to, content, headers=None):
        if not headers:
            headers = {}
        m = mailbox.Message()
        m["from"] = frm
        m["to"] = to
        for h, v in headers.items():
            m[h] = v
        m.set_payload(content)
        return m

__init__(user, mailid, tb=None, debug=False, keySearch=True)

Constructor

Parameters:

Name Type Description Default
user(string)

userid of the user

required
mailid(string)

unique id of the mail

required
debug(bool)

True if debugging should be activated

required
keySearch(bool)

True if a slow keySearch should be tried when lookup fails

required
Source code in thunderbird/mail.py
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
def __init__(self, user, mailid, tb=None, debug=False, keySearch=True):
    """
    Constructor

    Args:
        user(string): userid of the user
        mailid(string): unique id of the mail
        debug(bool): True if debugging should be activated
        keySearch(bool): True if a slow keySearch should be tried when lookup fails
    """
    self.debug = debug
    self.user = user
    if tb is None:
        self.tb = Thunderbird.get(user)
    else:
        self.tb = tb
    mailid = Mail.normalize_mailid(mailid)
    self.mailid = mailid
    self.keySearch = keySearch
    self.rawMsg = None
    self.msg = None
    self.headers = {}
    self.fromUrl = None
    self.fromMailTo = None
    self.toUrl = None
    self.toMailTo = None
    mail_record = self.search()
    if mail_record is not None:
        mail_lookup = MailLookup.from_mail_record(mail_record)
        self.folder_path = mail_lookup.folder_path
        folderPath = self.tb.local_folders + mail_lookup.folder_path
        tb_mbox = ThunderbirdMailbox(self.tb, folderPath, debug=self.debug)
        found=False
        if mail_lookup.start_pos is not None and mail_lookup.stop_pos is not None:
            self.msg = tb_mbox.get_message_by_pos(mail_lookup.start_pos, mail_lookup.stop_pos)
            found = self.check_mailid()
        if not found:
            # Fallback to other methods if start_pos and stop_pos are not available
            self.msg = tb_mbox.get_message_by_key(mail_lookup.message_index)
        # if lookup fails we might loop thru
        # all messages if this option is active ...
        found = self.check_mailid()
        if not found and self.keySearch:
            self.msg = tb_mbox.search_message_by_key(self.mailid)
        if self.msg is not None:
            if self.check_mailid():
                self.extract_message()
            else:
                self.msg = None
        tb_mbox.close()

asWikiMarkup()

convert me to wiki markup in Wikison notation

Returns:

Name Type Description
str str

a http://wiki.bitplan.com/index.php/WikiSon notation

Source code in thunderbird/mail.py
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
    def asWikiMarkup(self) -> str:
        """
        convert me to wiki markup in Wikison notation

        Returns:
            str: a http://wiki.bitplan.com/index.php/WikiSon notation
        """
        if len(self.headers) == 0:
            self.extract_headers()
        _msg_date, iso_date, _error_msg = Mail.get_iso_date(self.msg)
        wikison = f"""{{{{mail
|user={self.user}
|id={self.mailid}
|from={self.getHeader('From')}
|to={self.getHeader('To')}
|subject={self.getHeader('Subject')}
|date={iso_date}
}}}}"""
        return wikison

as_html()

Generate the HTML representation of the mail.

Source code in thunderbird/mail.py
1566
1567
1568
1569
1570
1571
def as_html(self):
    """Generate the HTML representation of the mail."""
    html = ""
    for section_name in ["title", "info", "parts", "text", "html"]:
        html += self.as_html_section(section_name)
    return html

as_html_error_msg()

Generates an HTML formatted error message for the Mail instance.

This method should be called when a Mail object is requested but not found. It uses the normalize_mailid method to format the mail ID in the error message.

Returns:

Name Type Description
str str

An HTML string representing the error message.

Source code in thunderbird/mail.py
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
def as_html_error_msg(self) -> str:
    """
    Generates an HTML formatted error message for the Mail instance.

    This method should be called when a Mail object is requested but not found.
    It uses the `normalize_mailid` method to format the mail ID in the error message.

    Returns:
        str: An HTML string representing the error message.
    """
    normalized_mailid = Mail.normalize_mailid(self.mailid)
    html_error_msg = f"<span style='color: red;'>Mail with id {normalized_mailid} not found</span>"
    return html_error_msg

as_html_section(section_name)

convert my content to the given html section

Parameters:

Name Type Description Default
section_name(str)

the name of the section to create

required
Source code in thunderbird/mail.py
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
def as_html_section(self, section_name):
    """
    convert my content to the given html section

    Args:
        section_name(str): the name of the section to create
    """
    html_parts = []
    # Start building the HTML string
    table_sections = ["info", "parts", "headers"]
    if section_name in table_sections:
        html_parts.append("<hr>")
        html_parts.append(f"<table id='{section_name}Table'>")
    if section_name == "title":
        if self.mailid:
            html_parts.append(f"<h2>{self.mailid}</h2>")
    elif section_name == "wiki":
        html_parts.append(f"<hr><pre>{self.asWikiMarkup()}</pre>")
    elif section_name == "info":
        html_parts.append(self.table_line("User", self.user))
        html_parts.append(self.table_line("Folder", self.folder_path))
        html_parts.append(self.table_line("From", self.fromUrl))
        html_parts.append(self.table_line("To", self.toUrl))
        html_parts.append(self.table_line("Date", self.getHeader("Date")))
        html_parts.append(self.table_line("Subject", self.getHeader("Subject")))
        html_parts.append(
            self.table_line("Message-ID", self.getHeader("Message-ID"))
        )
    elif section_name == "headers":
        for key, value in self.headers.items():
            html_parts.append(self.table_line(key, value))
    # Closing t
    elif section_name == "parts":
        for index, part in enumerate(self.msgParts):
            html_parts.append(self.mail_part_row(index, part))
    elif section_name == "text":
        # Add raw message parts if necessary
        html_parts.append(f"<hr><p id='txtMsg'>{self.txtMsg}</p>")
    elif section_name == "html":
        html_parts.append(f"<hr><div id='htmlMsg'>{self.html}</div>")
    if section_name in table_sections:
        # Closing tables
        html_parts.append("</table>")
    markup = "".join(html_parts)
    return markup

check_mailid()

check the mailid

Source code in thunderbird/mail.py
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
def check_mailid(self) -> bool:
    """
    check the mailid
    """
    found = False
    self.extract_headers()
    # workaround awkward mail ID handling 
    # headers should be case insensitive but in reality they might be not
    # if any message-id fits the self.mailid we'll consider the mail as found
    id_headers = ["Message-ID","Message-Id"]
    for id_header in id_headers:
        if id_header in self.headers:
            header_id = self.headers[id_header]
            header_id = self.normalize_mailid(header_id)
            if header_id == self.mailid:
                found=True
    return found

extract_headers()

update the headers

Source code in thunderbird/mail.py
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
def extract_headers(self):
    """
    update the headers
    """
    if not self.msg:
        self.headers = {}
    else:
        for key in self.msg.keys():
            # https://stackoverflow.com/a/21715870/1497139
            self.headers[key] = str(make_header(decode_header(self.msg.get(key))))

extract_message(lenient=False)

Extracts the message body and headers from the email message.

This method decodes each part of the email message, handling different content types and charsets. It appends the decoded text to the message object's text and HTML attributes.

Parameters:

Name Type Description Default
lenient bool

If True, the method will not raise an exception for decoding errors, and will instead skip the problematic parts.

False
Source code in thunderbird/mail.py
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
def extract_message(self, lenient: bool = False) -> None:
    """
    Extracts the message body and headers from the email message.

    This method decodes each part of the email message, handling different content types and charsets. It appends
    the decoded text to the message object's text and HTML attributes.

    Args:
        lenient (bool): If True, the method will not raise an exception for decoding errors, and will instead skip the problematic parts.

    """
    if len(self.headers) == 0:
        self.extract_headers()
    self.txtMsg = ""
    self.html = ""
    # https://stackoverflow.com/a/43833186/1497139
    self.msgParts = []
    # decode parts
    # https://stackoverflow.com/questions/59554237/how-to-handle-all-charset-and-content-type-when-reading-email-from-imap-lib-in-p
    # https://gist.github.com/miohtama/5389146
    for part in self.msg.walk():
        self.msgParts.append(part)
        part.length = len(part._payload)
        # each part is a either non-multipart, or another multipart message
        # that contains further parts... Message is organized like a tree
        contentType = part.get_content_type()
        charset = part.get_content_charset()
        if charset is None:
            charset = "utf-8"
        partname = part.get_param("name")
        part.filename = self.fixedPartName(
            partname, contentType, len(self.msgParts)
        )
        if contentType == "text/plain" or contentType == "text/html":
            part_str = part.get_payload(decode=1)
            rawPart = self.try_decode(part_str, charset, lenient)
            if rawPart is not None:
                if contentType == "text/plain":
                    self.txtMsg += rawPart
                elif contentType == "text/html":
                    self.html += rawPart
        pass
    self.handle_headers()

fixedPartName(partname, contentType, partIndex)

get a fixed version of the partname

Parameters:

Name Type Description Default
partname(str)

the name of the part

required
defaultName(str)

the default name to use

required
Source code in thunderbird/mail.py
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
def fixedPartName(self, partname: str, contentType: str, partIndex: int):
    """
    get a fixed version of the partname


    Args:
        partname(str): the name of the part
        defaultName(str): the default name to use
    """

    # avoid TypeError: expected string or bytes-like object
    if partname:
        if type(partname) is tuple:
            _encoding, _unknown, partname = partname
        filename = str(make_header(decode_header(partname)))
    else:
        ext = guess_extension(contentType.partition(";")[0].strip())
        if ext is None:
            ext = ".txt"
        filename = f"part{partIndex}{ext}"
    filename = fix_text(filename)
    return filename

getHeader(headerName)

get the header with the given name

Parameters:

Name Type Description Default
headerName(str)

the name of the header

required

Returns:

Name Type Description
str

the header value

Source code in thunderbird/mail.py
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
def getHeader(self, headerName: str):
    """
    get the header with the given name

    Args:
        headerName(str): the name of the header

    Returns:
        str: the header value
    """
    if headerName in self.headers:
        headerValue = self.headers[headerName]
    else:
        headerValue = "?"
    return headerValue

get_iso_date(msg) classmethod

Extracts and formats the date from the email header in ISO format.

Parameters:

Name Type Description Default
msg Mail

The mail object from which to extract the date.

required

Returns:

Type Description
str

Tuple[str, Optional[str], Optional[str]]: A tuple containing the msg_date, the formatted date in ISO format,

Optional[str]

and an error message if the date cannot be extracted or parsed, otherwise None.

Source code in thunderbird/mail.py
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
@classmethod
def get_iso_date(cls, msg) -> Tuple[str, Optional[str], Optional[str]]:
    """
    Extracts and formats the date from the email header in ISO format.

    Args:
        msg (Mail): The mail object from which to extract the date.

    Returns:
        Tuple[str, Optional[str], Optional[str]]: A tuple containing the msg_date, the formatted date in ISO format,
        and an error message if the date cannot be extracted or parsed, otherwise None.
    """
    date_parser = DateParser()
    msg_date = msg.get("Date", "")
    iso_date = "?"
    error_msg = None
    if msg_date:
        try:
            iso_date = date_parser.parse_date(msg_date)
        except Exception as e:
            error_msg = f"Error parsing date '{msg_date}': {e}"
    return msg_date, iso_date, error_msg

mail_part_row(loop_index, part)

Generate a table row for a mail part.

Source code in thunderbird/mail.py
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
def mail_part_row(self, loop_index: int, part):
    """Generate a table row for a mail part."""
    # Check if loop_index is 0 to add a header
    header = ""
    if self.mailid:
        mailid = self.mailid.replace(">", "").replace("<", "")
    else:
        mailid = "unknown-mailid"
    if loop_index == 0:
        header = "<tr><th>#</th><th>Content Type</th><th>Charset</th><th>Filename</th><th style='text-align:right'>Length</th></tr>"
    link = Link.create(f"/part/{self.user}/{mailid}/{loop_index}", part.filename)
    # Generate the row for the current part
    row = f"<tr><th>{loop_index+1}:</th><td>{part.get_content_type()}</td><td>{part.get_content_charset()}</td><td>{link}</a></td><td style='text-align:right'>{part.length}</td><tr>"
    return header + row

normalize_mailid(mail_id) classmethod

remove the surrounding <> of the given mail_id

Source code in thunderbird/mail.py
1262
1263
1264
1265
1266
1267
1268
@classmethod
def normalize_mailid(cls, mail_id: str) -> str:
    """
    remove the surrounding <> of the given mail_id
    """
    mail_id = re.sub(r"\<(.*)\>", r"\1", mail_id)
    return mail_id

part_as_fileresponse(part_index, attachments_path=None)

Return the specified part of a message as a FileResponse.

Parameters:

Name Type Description Default
part_index int

The index of the part to be returned.

required

Returns:

Name Type Description
FileResponse Any

A FileResponse object representing the specified part.

Raises:

Type Description
IndexError

If the part_index is out of range of the message parts.

ValueError

If the part content is not decodable.

Note

The method assumes that self.msgParts is a list-like container holding the message parts. Since FastAPI's FileResponse is designed to work with file paths, this function writes the content to a temporary file.

Source code in thunderbird/mail.py
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
def part_as_fileresponse(
    self, part_index: int, attachments_path: str = None
) -> Any:
    """
    Return the specified part of a message as a FileResponse.

    Args:
        part_index (int): The index of the part to be returned.

    Returns:
        FileResponse: A FileResponse object representing the specified part.

    Raises:
        IndexError: If the part_index is out of range of the message parts.
        ValueError: If the part content is not decodable.

    Note:
        The method assumes that self.msgParts is a list-like container holding the message parts.
        Since FastAPI's FileResponse is designed to work with file paths, this function writes the content to a temporary file.
    """
    # Check if part_index is within the range of msgParts
    if not 0 <= part_index < len(self.msgParts):
        raise IndexError("part_index out of range.")

    # Get the specific part from the msgParts
    part = self.msgParts[part_index]

    # Get the content of the part, decode if necessary
    try:
        content = part.get_payload(decode=True)
    except:
        raise ValueError("Unable to decode part content.")

    # Write content to a temporary file
    with tempfile.NamedTemporaryFile(delete=False) as temp_file:
        temp_file_name = temp_file.name
        temp_file.write(content)

    # Create and return a FileResponse object
    file_response = FileResponse(
        path=temp_file_name, filename=part.get_filename() or "file"
    )

    # Delete the temporary file after sending the response
    async def on_send_response() -> None:
        os.unlink(temp_file_name)

    file_response.background = on_send_response

    # response=StreamingResponse(io.BytesIO(content),media_type=)
    return file_response

search(use_index_db=True)

Search for an email by its ID in the specified Thunderbird mailbox database.

This method allows searching either the gloda database or the index database based on the use_index_db parameter. It returns a dictionary representing the found email or None if not found.

Parameters:

Name Type Description Default
use_index_db bool

If True, the search will be performed in the index database. If False, the search will be performed in the gloda database (default).

True

Returns:

Type Description
Optional[Dict[str, Any]]

Optional[Dict[str, Any]]: A dictionary representing the found email, or None if not found.

Source code in thunderbird/mail.py
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
def search(self, use_index_db: bool = True) -> Optional[Dict[str, Any]]:
    """
    Search for an email by its ID in the specified Thunderbird mailbox database.

    This method allows searching either the gloda database or the index database based on the `use_index_db` parameter.
    It returns a dictionary representing the found email or None if not found.

    Args:
        use_index_db (bool): If True, the search will be performed in the index database.
                             If False, the search will be performed in the gloda database (default).

    Returns:
        Optional[Dict[str, Any]]: A dictionary representing the found email, or None if not found.
    """
    if self.debug:
        print(f"Searching for mail with id {self.mailid} for user {self.user}")

    if use_index_db and self.tb.index_db_exists():
        # Query for the index database
        query = """SELECT * FROM mail_index 
                   WHERE message_id = ?"""
        source = "index_db"
        params = (f"<{self.mailid}>",)
    else:
        # Query for the gloda database
        query = """SELECT m.*, f.* 
                   FROM messages m JOIN
                        folderLocations f ON m.folderId = f.id
                   WHERE m.headerMessageID = (?)"""
        source = "gloda"
        params = (self.mailid,)

    db = (
        self.tb.index_db
        if use_index_db and self.tb.index_db_exists()
        else self.tb.sqlDB
    )
    maillookup = db.query(query, params)

    if self.debug:
        print(maillookup)

    # Store the result in a variable before returning
    mail_record = maillookup[0] if maillookup else None
    if mail_record:
        mail_record["source"] = source
        if not "message_id" in mail_record:
            mail_record["message_id"] = self.mailid
    return mail_record

table_line(key, value)

Generate a table row with a key and value.

Source code in thunderbird/mail.py
1501
1502
1503
def table_line(self, key, value):
    """Generate a table row with a key and value."""
    return f"<tr><th>{key}:</th><td>{value}</td><tr>"

toSbdFolder(folderURI) staticmethod

get the SBD folder for the given folderURI as a tuple

Parameters:

Name Type Description Default
folderURI(str)

the folder uri

required

Returns: sbdFolder(str): the prefix folder(str): the local path

Source code in thunderbird/mail.py
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
@staticmethod
def toSbdFolder(folderURI):
    """
    get the SBD folder for the given folderURI as a tuple

    Args:
        folderURI(str): the folder uri
    Returns:
        sbdFolder(str): the prefix
        folder(str): the local path
    """
    folder = folderURI.replace("mailbox://nobody@", "")
    # https://stackoverflow.com/a/14007559/1497139
    parts = folder.split("/")
    sbdFolder = "/Mail/"
    folder = ""
    for i, part in enumerate(parts):
        if i == 0:  # e.g. "Local Folders" ...
            sbdFolder += f"{part}/"
        elif i < len(parts) - 1:
            sbdFolder += f"{part}.sbd/"
            folder += f"{part}/"
        else:
            sbdFolder += f"{part}"
            folder += f"{part}"
    return sbdFolder, folder

try_decode(byte_str, charset, lenient)

Attempts to decode a byte string using multiple charsets.

Tries to decode the byte string using a series of common charsets, returning the decoded string upon success. If all attempts fail, it either raises a UnicodeDecodeError or returns None, based on the lenient flag.

Parameters:

Name Type Description Default
byte_str bytes

The byte string to be decoded.

required
charset str

The initial charset to attempt decoding with.

required
lenient bool

If True, suppresses UnicodeDecodeError and returns None for undecodable byte strings.

required

Returns:

Name Type Description
str str

The decoded string, or None if lenient is True and decoding fails.

Raises:

Type Description
UnicodeDecodeError

If decoding fails and lenient is False.

Source code in thunderbird/mail.py
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
def try_decode(self, byte_str: bytes, charset: str, lenient: bool) -> str:
    """
    Attempts to decode a byte string using multiple charsets.

    Tries to decode the byte string using a series of common charsets, returning the decoded string upon success.
    If all attempts fail, it either raises a UnicodeDecodeError or returns None, based on the lenient flag.

    Args:
        byte_str (bytes): The byte string to be decoded.
        charset (str): The initial charset to attempt decoding with.
        lenient (bool): If True, suppresses UnicodeDecodeError and returns None for undecodable byte strings.

    Returns:
        str: The decoded string, or None if lenient is True and decoding fails.

    Raises:
        UnicodeDecodeError: If decoding fails and lenient is False.

    """
    charsets_to_try = [charset, "utf-8", "iso-8859-1", "ascii"]
    # Ensure no duplicate charsets in the list
    unique_charsets = list(dict.fromkeys(charsets_to_try))

    for encoding in unique_charsets:
        try:
            decoded = byte_str.decode(encoding)
            return decoded
        except UnicodeDecodeError:
            continue

    if not lenient:
        raise UnicodeDecodeError(
            f"Failed to decode with charsets: {unique_charsets}"
        )
    return None

MailArchive dataclass

Represents a single mail archive for a user.

Attributes:

Name Type Description
user str

The user to whom the mail archive belongs.

gloda_db_path str

The file path of the mailbox global database (sqlite).

index_db_path str

The file path of the mailbox index database (sqlite).

profile str

the Thunderbird profile directory of this mailbox.

gloda_db_update_time str

The last update time of the global database, default is None.

index_db_update_time str

The last update time of the index database, default is None.

Source code in thunderbird/mail.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
@dataclass
class MailArchive:
    """
    Represents a single mail archive for a user.

    Attributes:
        user (str): The user to whom the mail archive belongs.
        gloda_db_path (str): The file path of the mailbox global database (sqlite).
        index_db_path (str, optional): The file path of the mailbox index database (sqlite).
        profile (str, optional): the Thunderbird profile directory of this mailbox.
        gloda_db_update_time (str, optional): The last update time of the global database, default is None.
        index_db_update_time (str, optional): The last update time of the index database, default is None.
    """

    user: str
    gloda_db_path: str
    index_db_path: str = None
    profile: str = None
    gloda_db_update_time: str = None
    index_db_update_time: str = None

    def index_db_exists(self) -> bool:
        """Checks if the index database file exists and is not empty.

        Returns:
            bool: True if the index database file exists and has a size greater than zero, otherwise False.
        """
        # Check if the index database file exists and its size is greater than zero bytes
        result: bool = (
            os.path.isfile(self.index_db_path)
            and os.path.getsize(self.index_db_path) > 0
        )
        return result

    def __post_init__(self):
        """
        Post-initialization processing to set the database update times.
        """
        self.gloda_db_update_time = self._get_file_update_time(self.gloda_db_path)
        self.index_db_path = os.path.join(
            os.path.dirname(self.gloda_db_path), "index_db.sqlite"
        )
        if self.index_db_exists():
            self.index_db_update_time = self._get_file_update_time(self.index_db_path)

    def _get_file_update_time(self, file_path: str) -> str:
        """
        Gets the formatted last update time of the specified file.

        Args:
            file_path (str): The  path of the file for which to get the update time.

        Returns:
            str: The formatted last update time.
        """
        timestamp = os.path.getmtime(file_path)
        return datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S")

    def to_dict(self, index: int = None) -> Dict[str, str]:
        """
        Converts the mail archive data to a dictionary, including the update times for both databases.

        Args:
            index (int, optional): An optional index to be added to the dictionary.

        Returns:
            Dict[str, str]: The dictionary representation of the mail archive.
        """
        profile_shortened = os.path.basename(self.profile) if self.profile else None
        ps_parts = (
            profile_shortened.split(".")
            if profile_shortened and "." in profile_shortened
            else [profile_shortened]
        )
        profile_key = ps_parts[0] if ps_parts else None
        record = {
            "user": self.user,
            #"gloda_db_path": self.gloda_db_path,
            #"index_db_path": self.index_db_path if self.index_db_path else "-",
            "profile": profile_key,
            "gloda_updated": self.gloda_db_update_time,
            "index_updated": self.index_db_update_time
            if self.index_db_update_time
            else "-",
        }
        if index is not None:
            record = {"#": index, **record}
        return record

__post_init__()

Post-initialization processing to set the database update times.

Source code in thunderbird/mail.py
71
72
73
74
75
76
77
78
79
80
def __post_init__(self):
    """
    Post-initialization processing to set the database update times.
    """
    self.gloda_db_update_time = self._get_file_update_time(self.gloda_db_path)
    self.index_db_path = os.path.join(
        os.path.dirname(self.gloda_db_path), "index_db.sqlite"
    )
    if self.index_db_exists():
        self.index_db_update_time = self._get_file_update_time(self.index_db_path)

index_db_exists()

Checks if the index database file exists and is not empty.

Returns:

Name Type Description
bool bool

True if the index database file exists and has a size greater than zero, otherwise False.

Source code in thunderbird/mail.py
58
59
60
61
62
63
64
65
66
67
68
69
def index_db_exists(self) -> bool:
    """Checks if the index database file exists and is not empty.

    Returns:
        bool: True if the index database file exists and has a size greater than zero, otherwise False.
    """
    # Check if the index database file exists and its size is greater than zero bytes
    result: bool = (
        os.path.isfile(self.index_db_path)
        and os.path.getsize(self.index_db_path) > 0
    )
    return result

to_dict(index=None)

Converts the mail archive data to a dictionary, including the update times for both databases.

Parameters:

Name Type Description Default
index int

An optional index to be added to the dictionary.

None

Returns:

Type Description
Dict[str, str]

Dict[str, str]: The dictionary representation of the mail archive.

Source code in thunderbird/mail.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def to_dict(self, index: int = None) -> Dict[str, str]:
    """
    Converts the mail archive data to a dictionary, including the update times for both databases.

    Args:
        index (int, optional): An optional index to be added to the dictionary.

    Returns:
        Dict[str, str]: The dictionary representation of the mail archive.
    """
    profile_shortened = os.path.basename(self.profile) if self.profile else None
    ps_parts = (
        profile_shortened.split(".")
        if profile_shortened and "." in profile_shortened
        else [profile_shortened]
    )
    profile_key = ps_parts[0] if ps_parts else None
    record = {
        "user": self.user,
        #"gloda_db_path": self.gloda_db_path,
        #"index_db_path": self.index_db_path if self.index_db_path else "-",
        "profile": profile_key,
        "gloda_updated": self.gloda_db_update_time,
        "index_updated": self.index_db_update_time
        if self.index_db_update_time
        else "-",
    }
    if index is not None:
        record = {"#": index, **record}
    return record

MailArchives

Manages a collection of MailArchive instances for different users.

Attributes:

Name Type Description
user_list List[str]

A list of user names.

mail_archives Dict[str, MailArchive]

A dictionary mapping users to their MailArchive instances.

Source code in thunderbird/mail.py
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
class MailArchives:
    """
    Manages a collection of MailArchive instances for different users.

    Attributes:
        user_list (List[str]): A list of user names.
        mail_archives (Dict[str, MailArchive]): A dictionary mapping users to their MailArchive instances.
    """

    def __init__(self, user_list: List[str]):
        """
        Initializes the MailArchives with a list of users.

        Args:
            user_list (List[str]): A list of user names.
        """
        self.user_list = user_list
        self.mail_archives = self._create_mail_archives()

    def _create_mail_archives(self) -> Dict[str, MailArchive]:
        """
        Creates MailArchive instances for each user in the user list.
        """
        archives = {}
        for user in self.user_list:
            # Assuming Thunderbird.get(user) returns a Thunderbird instance with a valid mailbox DB path attribute
            tb_instance = Thunderbird.get(user)
            archives[user] = tb_instance
        return archives

    def as_view_lod(self) -> List[Dict[str, str]]:
        """
        Creates a list of dictionaries representation of the mail archives.

        Returns:
            List[Dict[str,str]]: A list of dictionaries, each representing a mail archive.
        """
        lod = []
        for index, archive in enumerate(self.mail_archives.values()):
            record = archive.to_dict(index + 1)
            user = record["user"]
            profile = record["profile"]
            profile_url = f"/profile/{user}/{profile}"
            record["profile"] = Link.create(profile_url, profile)
            record["mailboxes"] = Link.create(f"{profile_url}/mailboxes", "mailboxes")
            record["search"] = Link.create(f"{profile_url}/search", "search")
            record["index"] = Link.create(f"{profile_url}/index", "index")
            # add restful call to update index
            lod.append(record)
        return lod

__init__(user_list)

Initializes the MailArchives with a list of users.

Parameters:

Name Type Description Default
user_list List[str]

A list of user names.

required
Source code in thunderbird/mail.py
733
734
735
736
737
738
739
740
741
def __init__(self, user_list: List[str]):
    """
    Initializes the MailArchives with a list of users.

    Args:
        user_list (List[str]): A list of user names.
    """
    self.user_list = user_list
    self.mail_archives = self._create_mail_archives()

as_view_lod()

Creates a list of dictionaries representation of the mail archives.

Returns:

Type Description
List[Dict[str, str]]

List[Dict[str,str]]: A list of dictionaries, each representing a mail archive.

Source code in thunderbird/mail.py
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
def as_view_lod(self) -> List[Dict[str, str]]:
    """
    Creates a list of dictionaries representation of the mail archives.

    Returns:
        List[Dict[str,str]]: A list of dictionaries, each representing a mail archive.
    """
    lod = []
    for index, archive in enumerate(self.mail_archives.values()):
        record = archive.to_dict(index + 1)
        user = record["user"]
        profile = record["profile"]
        profile_url = f"/profile/{user}/{profile}"
        record["profile"] = Link.create(profile_url, profile)
        record["mailboxes"] = Link.create(f"{profile_url}/mailboxes", "mailboxes")
        record["search"] = Link.create(f"{profile_url}/search", "search")
        record["index"] = Link.create(f"{profile_url}/index", "index")
        # add restful call to update index
        lod.append(record)
    return lod

MailLookup dataclass

A data class representing a mail lookup entity.

Attributes:

Name Type Description
message_index int

The index of the message.

folder_path str

The path to the folder containing the message.

message_id str

The unique identifier of the message.

start_pos int

The start byte position of the message in the mailbox file.

stop_pos int

The stop byte position of the message in the mailbox file.

Source code in thunderbird/mail.py
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
@dataclass
class MailLookup:
    """
    A data class representing a mail lookup entity.

    Attributes:
        message_index (int): The index of the message.
        folder_path (str): The path to the folder containing the message.
        message_id (str): The unique identifier of the message.
        start_pos (int, optional): The start byte position of the message in the mailbox file.
        stop_pos (int, optional): The stop byte position of the message in the mailbox file.
    """

    message_index: int
    folder_path: str
    message_id: str
    start_pos: Optional[int] = None
    stop_pos: Optional[int] = None

    @classmethod
    def from_gloda_record(cls, mail_record: Dict) -> "MailLookup":
        """
        Creates a MailLookup instance from a Gloda record.

        Args:
            mail_record (Dict): A dictionary representing a Gloda record.

        Returns:
            MailLookup: An instance of MailLookup.
        """
        message_id = mail_record["message_id"]
        folder_uri = mail_record["folderURI"]
        message_index = int(mail_record["messageKey"])
        folder_uri = urllib.parse.unquote(folder_uri)
        sbd_folder, _folder = Mail.toSbdFolder(folder_uri)
        relative_folder = ThunderbirdMailbox.as_relative_path(sbd_folder)
        return cls(message_index, relative_folder, message_id)

    @classmethod
    def from_index_db_record(cls, mail_record: Dict) -> "MailLookup":
        """
        Creates a MailLookup instance from an index database record.

        Args:
            mail_record (Dict): A dictionary representing an index database record.

        Returns:
            MailLookup: An instance of MailLookup.
        """
        message_id = mail_record["message_id"]
        message_index = mail_record["email_index"]
        folder_path = mail_record["folder_path"]
        start_pos = mail_record.get("start_pos")  # Use .get to handle missing keys
        stop_pos = mail_record.get("stop_pos")
        return cls(message_index, folder_path, message_id, start_pos, stop_pos)

    @classmethod
    def from_mail_record(cls, mail_record: Dict) -> "MailLookup":
        """
        Creates a MailLookup instance based on the source of the mail record.

        Args:
            mail_record (Dict): A dictionary containing the mail record.

        Returns:
            MailLookup: An instance of MailLookup based on the source.
        """
        source = mail_record["source"]
        if source == "gloda":
            return cls.from_gloda_record(mail_record)
        else:
            return cls.from_index_db_record(mail_record)

from_gloda_record(mail_record) classmethod

Creates a MailLookup instance from a Gloda record.

Parameters:

Name Type Description Default
mail_record Dict

A dictionary representing a Gloda record.

required

Returns:

Name Type Description
MailLookup MailLookup

An instance of MailLookup.

Source code in thunderbird/mail.py
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
@classmethod
def from_gloda_record(cls, mail_record: Dict) -> "MailLookup":
    """
    Creates a MailLookup instance from a Gloda record.

    Args:
        mail_record (Dict): A dictionary representing a Gloda record.

    Returns:
        MailLookup: An instance of MailLookup.
    """
    message_id = mail_record["message_id"]
    folder_uri = mail_record["folderURI"]
    message_index = int(mail_record["messageKey"])
    folder_uri = urllib.parse.unquote(folder_uri)
    sbd_folder, _folder = Mail.toSbdFolder(folder_uri)
    relative_folder = ThunderbirdMailbox.as_relative_path(sbd_folder)
    return cls(message_index, relative_folder, message_id)

from_index_db_record(mail_record) classmethod

Creates a MailLookup instance from an index database record.

Parameters:

Name Type Description Default
mail_record Dict

A dictionary representing an index database record.

required

Returns:

Name Type Description
MailLookup MailLookup

An instance of MailLookup.

Source code in thunderbird/mail.py
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
@classmethod
def from_index_db_record(cls, mail_record: Dict) -> "MailLookup":
    """
    Creates a MailLookup instance from an index database record.

    Args:
        mail_record (Dict): A dictionary representing an index database record.

    Returns:
        MailLookup: An instance of MailLookup.
    """
    message_id = mail_record["message_id"]
    message_index = mail_record["email_index"]
    folder_path = mail_record["folder_path"]
    start_pos = mail_record.get("start_pos")  # Use .get to handle missing keys
    stop_pos = mail_record.get("stop_pos")
    return cls(message_index, folder_path, message_id, start_pos, stop_pos)

from_mail_record(mail_record) classmethod

Creates a MailLookup instance based on the source of the mail record.

Parameters:

Name Type Description Default
mail_record Dict

A dictionary containing the mail record.

required

Returns:

Name Type Description
MailLookup MailLookup

An instance of MailLookup based on the source.

Source code in thunderbird/mail.py
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
@classmethod
def from_mail_record(cls, mail_record: Dict) -> "MailLookup":
    """
    Creates a MailLookup instance based on the source of the mail record.

    Args:
        mail_record (Dict): A dictionary containing the mail record.

    Returns:
        MailLookup: An instance of MailLookup based on the source.
    """
    source = mail_record["source"]
    if source == "gloda":
        return cls.from_gloda_record(mail_record)
    else:
        return cls.from_index_db_record(mail_record)

Thunderbird

Bases: MailArchive

Thunderbird Mailbox access

Source code in thunderbird/mail.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
class Thunderbird(MailArchive):
    """
    Thunderbird Mailbox access
    """

    profiles = {}

    def __init__(self, user: str, db=None, profile=None):
        """
        construct a Thunderbird access instance for the given user
        """
        self.user = user
        if db is None and profile is None:
            profileMap = Thunderbird.getProfileMap()
            if user in profileMap:
                profile = profileMap[user]
            else:
                raise Exception(f"user {user} missing in .thunderbird.yaml")
            db = profile["db"]
            profile = profile["profile"]

        # Call the super constructor
        super().__init__(user=user, gloda_db_path=db, profile=profile)

        try:
            self.sqlDB = SQLDB(self.gloda_db_path, check_same_thread=False, timeout=5)
        except sqlite3.OperationalError as soe:
            print(f"could not open database {self.db}: {soe}")
            raise soe
        pass
        self.index_db = SQLDB(self.index_db_path, check_same_thread=False)
        self.local_folders = f"{self.profile}/Mail/Local Folders"
        self.errors=[]

    def get_mailboxes(self, progress_bar=None, restore_toc: bool = False):
        """
        Create a dict of Thunderbird mailboxes.

        """
        extensions = {"Folder": ".sbd", "Mailbox": ""}
        file_selector = FileSelector(
            path=self.local_folders, extensions=extensions, create_ui=False
        )
        if progress_bar is not None:
            progress_bar.total = file_selector.file_count
        mailboxes = {}  # Dictionary to store ThunderbirdMailbox instances
        self.errors=[]
        self._traverse_tree(file_selector.tree_structure, mailboxes, progress_bar, restore_toc)
        return mailboxes

    def add_mailbox(self,mailbox_path,mailboxes, progress_bar, restore_toc:bool=False):
        """
        add a ThunderbirdMailBox for the given mailbox_path to the mailboxes
        """
        mailboxes[mailbox_path] = ThunderbirdMailbox(
            self, mailbox_path, restore_toc=restore_toc
        )
        if progress_bar:
            progress_bar.update(1)

    def _traverse_tree(self, parent, mailboxes, progress_bar, restore_toc):
        """
        traves the file system tree from the given parent node
        """
        self._add_mailbox_from_node(parent, mailboxes, progress_bar, restore_toc)
        for child in parent.get("children", []):
            self._traverse_tree(child, mailboxes, progress_bar, restore_toc)

    def _add_mailbox_from_node(self, node, mailboxes, progress_bar, restore_toc):
        """
        add a mailbox from the given file system tree node
        """
        is_valid_mailbox=node["id"] != "1" and not node["label"].endswith(".sbd")
        if is_valid_mailbox:
            try:
                mailbox_path = node["value"]
                self.add_mailbox(mailbox_path, mailboxes, progress_bar, restore_toc)
            except ValueError as e:
                errmsg=f"{node['value']}: {str(e)}"
                self.errors.append(errmsg)

    def get_mailboxes_by_relative_path(self) -> Dict[str, "ThunderbirdMailbox"]:
        """
        Retrieves all mailboxes and returns a dictionary keyed by their relative folder paths.

        This method fetches all Thunderbird mailboxes and organizes them in a dictionary where the keys are the
        relative paths of the mailboxes, providing a quick way to access a mailbox by its relative path.

        Returns:
            Dict[str, ThunderbirdMailbox]: A dictionary where the keys are relative folder paths and the values are
                                           ThunderbirdMailbox objects representing the corresponding mailboxes.
        """
        mailboxes_dict = self.get_mailboxes()
        mailboxes_by_relative_path = {}
        for mailbox in mailboxes_dict.values():
            mailboxes_by_relative_path[mailbox.relative_folder_path] = mailbox
        return mailboxes_by_relative_path

    def to_view_lod(
        self,
        fs_mailboxes_dict: Dict[str, "ThunderbirdMailbox"],
        db_mailboxes_dict: Dict[str, Any],
        force_count: bool = False,
    ) -> List[Dict[str, Any]]:
        """
        Merges mailbox information from the filesystem and SQL database into a unified view.

        Args:
            fs_mailboxes_dict (Dict[str, ThunderbirdMailbox]): Mailboxes from the filesystem.
            db_mailboxes_dict (Dict[str, Any]): Mailboxes from the SQL database.
            force_count(bool): if True get count from mailboxes (costly!)
        Returns:
            List[Dict[str, Any]]: A unified list of dictionaries, each representing a mailbox.
        """
        merged_view_lod = []
        all_keys = set(fs_mailboxes_dict.keys()) | set(db_mailboxes_dict.keys())
        unknown = "❓"
        disk_symbol = "💾"  # Symbol representing the filesystem
        database_symbol = "🗄️"  # Symbol representing the database

        for key in all_keys:
            fs_mailbox = fs_mailboxes_dict.get(key)
            db_mailbox = db_mailboxes_dict.get(key)

            state_char = (
                "🔄"
                if fs_mailbox and db_mailbox
                else disk_symbol
                if fs_mailbox
                else database_symbol
            )
            if db_mailbox and "message_count" in db_mailbox:
                count_str = str(db_mailbox["message_count"])
            elif fs_mailbox and force_count:
                count_str = (
                    str(len(fs_mailbox.mbox)) if hasattr(fs_mailbox, "mbox") else "⚠️❓"
                )
            else:
                count_str = unknown
            relative_folder_path = (
                fs_mailbox.relative_folder_path
                if fs_mailbox
                else db_mailbox["relative_folder_path"]
            )
            folder_url = (
                f"/folder/{self.user}{relative_folder_path}" if fs_mailbox else "#"
            )
            error_str = (
                fs_mailbox.error if fs_mailbox else db_mailbox.get("Error", unknown)
            )
            fs_update_time = fs_mailbox.folder_update_time if fs_mailbox else unknown
            db_update_time = db_mailbox["folder_update_time"] if db_mailbox else unknown

            mailbox_record = {
                "State": state_char,
                "Folder": Link.create(folder_url, relative_folder_path),
                f"{disk_symbol}-Updated": fs_update_time,
                f"{database_symbol}-Updated": db_update_time,
                "Count": count_str,
                "Error": error_str,
            }
            merged_view_lod.append(mailbox_record)

        # Sorting by 'Updated' field
        merged_view_lod.sort(key=lambda x: x[f"{disk_symbol}-Updated"], reverse=True)

        # Assigning index after sorting
        for index, record in enumerate(merged_view_lod):
            merged_view_lod[index] = {"#": index, **record}

        return merged_view_lod

    def get_synched_mailbox_view_lod(self):
        """
        Fetches and synchronizes mailbox data from the filesystem and SQL database and returns a unified view.

        Returns:
            List[Dict[str, Any]]: A unified list of dictionaries, each representing a mailbox.
        """
        # Retrieve mailbox data from the filesystem
        fs_mboxes_dict = self.get_mailboxes_by_relative_path()

        # Retrieve mailbox data from the SQL database
        db_mboxes_dict = self.get_mailboxes_dod_from_sqldb(
            self.index_db
        )  # Database mailboxes

        # Merge and format the data for view
        mboxes_view_lod = self.to_view_lod(fs_mboxes_dict, db_mboxes_dict)

        return mboxes_view_lod

    def get_mailboxes_dod_from_sqldb(self, sql_db: SQLDB) -> dict:
        """
        Retrieve the mailbox list of dictionaries (LoD) from the given SQL database,
        and return it as a dictionary keyed by relative_folder_path.

        Args:
            sql_db (SQLDB): An instance of SQLDB connected to the SQLite database.

        Returns:
            dict: A dictionary of mailbox dictionaries, keyed by relative_folder_path.
        """
        sql_query = """SELECT *
                       FROM mailboxes 
                       ORDER BY folder_update_time DESC"""
        try:
            mailboxes_lod = sql_db.query(sql_query)
            mailboxes_dict = {mb["relative_folder_path"]: mb for mb in mailboxes_lod}
            return mailboxes_dict
        except sqlite3.OperationalError as e:
            if "no such table" in str(e):
                return {}
            else:
                raise e

    def index_mailbox(
        self,
        mailbox: "ThunderbirdMailbox",
        progress_bar: Progressbar,
        force_create: bool,
    ) -> tuple:
        """
        Process a single mailbox for updating the index.

        Args:
            mailbox (ThunderbirdMailbox): The mailbox to be processed.
            progress_bar (Progressbar): Progress bar object for visual feedback.
            force_create (bool): Flag to force creation of a new index.

        Returns:
            tuple: A tuple containing the message count and any exception occurred.
        """
        message_count = 0
        exception = None

        try:
            mbox_lod = mailbox.get_index_lod()
            message_count = len(mbox_lod)

            if message_count > 0:
                with_create = not self.index_db_exists() or force_create
                mbox_entity_info = self.index_db.createTable(
                    mbox_lod,
                    "mail_index",
                    withCreate=with_create,
                    withDrop=with_create,
                )
                # first delete existing index entries (if any)
                delete_cmd = f"DELETE FROM mail_index WHERE folder_path='{mailbox.relative_folder_path}'"
                self.index_db.execute(delete_cmd)
                # then store the new ones
                self.index_db.store(mbox_lod, mbox_entity_info, fixNone=True)

        except Exception as ex:
            exception = ex

        progress_bar.update(1)  # Update the progress bar after processing each mailbox
        return message_count, exception  # Single return statement

    def prepare_mailboxes_for_indexing(
        self,
        ixs:IndexingState,
        progress_bar: Optional[Progressbar] = None,
        relative_paths: Optional[List[str]] = None,
    ) -> Tuple[Dict[str, "ThunderbirdMailbox"], Dict[str, "ThunderbirdMailbox"]]:
        """
        Prepare a list of mailboxes for indexing by identifying which ones need to be updated.

        This function iterates through all Thunderbird mailboxes, checking if each needs an update
        based on the last update time in the index database. It returns two dictionaries: one containing
        all mailboxes and another containing only the mailboxes that need updating.

        Args:
            ixs:IndexingState: the indexing state to work on
            force_create (bool): Flag to force creation of a new index for all mailboxes.
            progress_bar (Optional[Progressbar]): A progress bar instance for displaying the progress.
            relative_paths (Optional[List[str]]): A list of relative paths for specific mailboxes to update. If None, all mailboxes are considered.

        Returns:
            Tuple[Dict[str, ThunderbirdMailbox], Dict[str, ThunderbirdMailbox]]: A tuple containing two dictionaries.
                The first dictionary contains all mailboxes, and the second contains only mailboxes that need updating.
        """

        # optionally Retrieve all mailboxes
        if not relative_paths:
            ixs.all_mailboxes = self.get_mailboxes(progress_bar)
        else:
            ixs.all_mailboxes = {}
            for relative_path in relative_paths:
                mailbox_path = f"{self.profile}/Mail/Local Folders{relative_path}"
                mailbox = ThunderbirdMailbox(self, mailbox_path)
                ixs.all_mailboxes[mailbox_path] = mailbox
        # Retrieve the current state of mailboxes from the index database, if not forcing creation
        mailboxes_update_dod = {}
        if not ixs.force_create:
            mailboxes_update_dod = self.get_mailboxes_dod_from_sqldb(self.index_db)

        # List to hold mailboxes that need updating
        ixs.mailboxes_to_update = {}

        # Iterate through each mailbox to check if it needs updating
        if ixs.force_create:
            # If force_create is True, add all mailboxes to the update list
            for mailbox in ixs.all_mailboxes.values():
                ixs.mailboxes_to_update[mailbox.relative_folder_path] = mailbox
        else:
            if relative_paths is not None:
                for mailbox in ixs.all_mailboxes.values():
                    if mailbox.relative_folder_path in relative_paths:
                        ixs.mailboxes_to_update[mailbox.relative_folder_path] = mailbox
            else:
                for mailbox in ixs.all_mailboxes.values():
                    # Check update times only if not forcing creation
                    mailbox_info = mailboxes_update_dod.get(
                        mailbox.relative_folder_path, {}
                    )
                    _prev_mailbox_update_time = mailbox_info.get("folder_update_time")
                    current_folder_update_time = mailbox.folder_update_time

                    # Check if the mailbox needs updating
                    if (
                        self.index_db_update_time is None
                    ) or current_folder_update_time > self.index_db_update_time:
                        ixs.mailboxes_to_update[mailbox.relative_folder_path] = mailbox
        ixs.total_mailboxes = len(ixs.mailboxes_to_update)
        if progress_bar:
            progress_bar.total = ixs.total_mailboxes
            progress_bar.reset()


    def get_indexing_state(self, force_create: bool=False) -> IndexingState:
        """
        Check if the index database needs to be updated or created.

        Args:
            force_create (bool): Flag to force creation of a new index.

        Returns:
            IndexingState
        """
        gloda_db_update_time = (
            datetime.fromtimestamp(os.path.getmtime(self.gloda_db_path))
            if self.gloda_db_path
            else None
        )
        index_db_update_time = (
            datetime.fromtimestamp(os.path.getmtime(self.index_db_path))
            if self.index_db_exists()
            else None
        )
        index_up_to_date = (
            self.index_db_exists() and index_db_update_time > gloda_db_update_time
        )
        ixs=IndexingState(
            gloda_db_update_time=gloda_db_update_time,
            index_db_update_time=index_db_update_time,
            force_create=force_create)
        ixs.needs_update = not index_up_to_date or force_create
        marker = "❌ " if ixs.needs_update else "✅"
        ixs.state_msg = f"""{marker} {self.user} update times:
Index db: {ixs.index_db_update_time} 
   Gloda: {ixs.gloda_db_update_time}
"""
        return ixs

    def create_or_update_index(self,
        relative_paths: Optional[List[str]] = None,
        force_create:bool=False)->IndexingState:
        # Initialize IndexingResult
        ixs=self.get_indexing_state(force_create)
        self.do_create_or_update_index(ixs,relative_paths=relative_paths)
        return ixs

    def do_create_or_update_index(
        self,
        ixs:IndexingState,
        progress_bar: Optional[Progressbar] = None,
        relative_paths: Optional[List[str]] = None,
        callback:callable = None
    ) :
        """
        Create or update an index of emails from Thunderbird mailboxes, storing the data in an SQLite database.
        If an index already exists and is up-to-date, this method will update it instead of creating a new one.

        Args:
            ixs:IndexingState: the indexing state to work with
            progress_bar (Progressbar, optional): Progress bar to display the progress of index creation.
            relative_paths (Optional[List[str]]): List of relative mailbox paths to specifically update. If None, updates all mailboxes or based on `force_create`.

        """
        if ixs.needs_update or relative_paths:
            if progress_bar is None:
                progress_bar = TqdmProgressbar(
                    total=ixs.total_mailboxes, desc="create index", unit="mailbox"
                )

            self.prepare_mailboxes_for_indexing(ixs=ixs,
               progress_bar=progress_bar, 
               relative_paths=relative_paths
            )

            needs_create = ixs.force_create or not self.index_db_exists()
            for mailbox in ixs.mailboxes_to_update.values():
                message_count, exception = self.index_mailbox(
                    mailbox, progress_bar, needs_create
                )
                if message_count > 0 and needs_create:
                    needs_create = (
                        False  # Subsequent updates will not recreate the table
                    )

                if exception:
                    mailbox.error = exception
                    ixs.errors[mailbox.folder_path] = exception
                else:
                    ixs.success[mailbox.folder_path] = message_count
                ixs.update_msg()
                if callback:
                    callback(mailbox,message_count)
            # if not relative paths were set we need to recreate the mailboxes table
            needs_create = relative_paths is None
            if relative_paths:
                # Delete existing entries for updated mailboxes
                for relative_path in relative_paths:
                    delete_query = (
                        f"DELETE FROM mailboxes WHERE folder_path = '{relative_path}'"
                    )
                    self.index_db.execute(delete_query)

                # Re-create the list of dictionaries for all selected mailboxes
                mailboxes_lod = [
                    mailbox.to_dict() for mailbox in ixs.mailboxes_to_update.values()
                ]
            else:
                mailboxes_lod = [
                    mailbox.to_dict() for mailbox in ixs.all_mailboxes.values()
                ]
            mailboxes_entity_info = self.index_db.createTable(
                mailboxes_lod,
                "mailboxes",
                withCreate=needs_create,
                withDrop=needs_create,
            )
            # Store the mailbox data in the 'mailboxes' table
            if len(mailboxes_lod) > 0:
                self.index_db.store(mailboxes_lod, mailboxes_entity_info)
        else:
            ixs.msg=ixs.state_msg

    @classmethod
    def get_config_path(cls) -> str:
        home = str(Path.home())
        return os.path.join(home, ".thunderbird")

    @classmethod
    def get_profiles_path(cls) -> str:
        """
        get the profile path
        """
        config_path = cls.get_config_path()
        # Ensure the config_path exists
        os.makedirs(config_path, exist_ok=True)

        profiles_path = os.path.join(config_path, "thunderbird.yaml")
        return profiles_path

    @classmethod
    def getProfileMap(cls):
        """
        get the profile map from a thunderbird.yaml file
        """
        profiles_path = cls.get_profiles_path()
        with open(profiles_path, "r") as stream:
            profile_map = yaml.safe_load(stream)
        return profile_map

    @staticmethod
    def get(user):
        if not user in Thunderbird.profiles:
            tb = Thunderbird(user)
            Thunderbird.profiles[user] = tb
        return Thunderbird.profiles[user]

    def query(self, sql_query: str, params):
        """
        query this mailbox gloda

        Args:
            sql_query(str): the sql query to execute
            params: the parameters for the query
        """
        records = self.sqlDB.query(sql_query, params)
        return records

__init__(user, db=None, profile=None)

construct a Thunderbird access instance for the given user

Source code in thunderbird/mail.py
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def __init__(self, user: str, db=None, profile=None):
    """
    construct a Thunderbird access instance for the given user
    """
    self.user = user
    if db is None and profile is None:
        profileMap = Thunderbird.getProfileMap()
        if user in profileMap:
            profile = profileMap[user]
        else:
            raise Exception(f"user {user} missing in .thunderbird.yaml")
        db = profile["db"]
        profile = profile["profile"]

    # Call the super constructor
    super().__init__(user=user, gloda_db_path=db, profile=profile)

    try:
        self.sqlDB = SQLDB(self.gloda_db_path, check_same_thread=False, timeout=5)
    except sqlite3.OperationalError as soe:
        print(f"could not open database {self.db}: {soe}")
        raise soe
    pass
    self.index_db = SQLDB(self.index_db_path, check_same_thread=False)
    self.local_folders = f"{self.profile}/Mail/Local Folders"
    self.errors=[]

add_mailbox(mailbox_path, mailboxes, progress_bar, restore_toc=False)

add a ThunderbirdMailBox for the given mailbox_path to the mailboxes

Source code in thunderbird/mail.py
278
279
280
281
282
283
284
285
286
def add_mailbox(self,mailbox_path,mailboxes, progress_bar, restore_toc:bool=False):
    """
    add a ThunderbirdMailBox for the given mailbox_path to the mailboxes
    """
    mailboxes[mailbox_path] = ThunderbirdMailbox(
        self, mailbox_path, restore_toc=restore_toc
    )
    if progress_bar:
        progress_bar.update(1)

do_create_or_update_index(ixs, progress_bar=None, relative_paths=None, callback=None)

Create or update an index of emails from Thunderbird mailboxes, storing the data in an SQLite database. If an index already exists and is up-to-date, this method will update it instead of creating a new one.

Parameters:

Name Type Description Default
ixs IndexingState

IndexingState: the indexing state to work with

required
progress_bar Progressbar

Progress bar to display the progress of index creation.

None
relative_paths Optional[List[str]]

List of relative mailbox paths to specifically update. If None, updates all mailboxes or based on force_create.

None
Source code in thunderbird/mail.py
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
def do_create_or_update_index(
    self,
    ixs:IndexingState,
    progress_bar: Optional[Progressbar] = None,
    relative_paths: Optional[List[str]] = None,
    callback:callable = None
) :
    """
    Create or update an index of emails from Thunderbird mailboxes, storing the data in an SQLite database.
    If an index already exists and is up-to-date, this method will update it instead of creating a new one.

    Args:
        ixs:IndexingState: the indexing state to work with
        progress_bar (Progressbar, optional): Progress bar to display the progress of index creation.
        relative_paths (Optional[List[str]]): List of relative mailbox paths to specifically update. If None, updates all mailboxes or based on `force_create`.

    """
    if ixs.needs_update or relative_paths:
        if progress_bar is None:
            progress_bar = TqdmProgressbar(
                total=ixs.total_mailboxes, desc="create index", unit="mailbox"
            )

        self.prepare_mailboxes_for_indexing(ixs=ixs,
           progress_bar=progress_bar, 
           relative_paths=relative_paths
        )

        needs_create = ixs.force_create or not self.index_db_exists()
        for mailbox in ixs.mailboxes_to_update.values():
            message_count, exception = self.index_mailbox(
                mailbox, progress_bar, needs_create
            )
            if message_count > 0 and needs_create:
                needs_create = (
                    False  # Subsequent updates will not recreate the table
                )

            if exception:
                mailbox.error = exception
                ixs.errors[mailbox.folder_path] = exception
            else:
                ixs.success[mailbox.folder_path] = message_count
            ixs.update_msg()
            if callback:
                callback(mailbox,message_count)
        # if not relative paths were set we need to recreate the mailboxes table
        needs_create = relative_paths is None
        if relative_paths:
            # Delete existing entries for updated mailboxes
            for relative_path in relative_paths:
                delete_query = (
                    f"DELETE FROM mailboxes WHERE folder_path = '{relative_path}'"
                )
                self.index_db.execute(delete_query)

            # Re-create the list of dictionaries for all selected mailboxes
            mailboxes_lod = [
                mailbox.to_dict() for mailbox in ixs.mailboxes_to_update.values()
            ]
        else:
            mailboxes_lod = [
                mailbox.to_dict() for mailbox in ixs.all_mailboxes.values()
            ]
        mailboxes_entity_info = self.index_db.createTable(
            mailboxes_lod,
            "mailboxes",
            withCreate=needs_create,
            withDrop=needs_create,
        )
        # Store the mailbox data in the 'mailboxes' table
        if len(mailboxes_lod) > 0:
            self.index_db.store(mailboxes_lod, mailboxes_entity_info)
    else:
        ixs.msg=ixs.state_msg

getProfileMap() classmethod

get the profile map from a thunderbird.yaml file

Source code in thunderbird/mail.py
695
696
697
698
699
700
701
702
703
@classmethod
def getProfileMap(cls):
    """
    get the profile map from a thunderbird.yaml file
    """
    profiles_path = cls.get_profiles_path()
    with open(profiles_path, "r") as stream:
        profile_map = yaml.safe_load(stream)
    return profile_map

get_indexing_state(force_create=False)

Check if the index database needs to be updated or created.

Parameters:

Name Type Description Default
force_create bool

Flag to force creation of a new index.

False

Returns:

Type Description
IndexingState

IndexingState

Source code in thunderbird/mail.py
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
    def get_indexing_state(self, force_create: bool=False) -> IndexingState:
        """
        Check if the index database needs to be updated or created.

        Args:
            force_create (bool): Flag to force creation of a new index.

        Returns:
            IndexingState
        """
        gloda_db_update_time = (
            datetime.fromtimestamp(os.path.getmtime(self.gloda_db_path))
            if self.gloda_db_path
            else None
        )
        index_db_update_time = (
            datetime.fromtimestamp(os.path.getmtime(self.index_db_path))
            if self.index_db_exists()
            else None
        )
        index_up_to_date = (
            self.index_db_exists() and index_db_update_time > gloda_db_update_time
        )
        ixs=IndexingState(
            gloda_db_update_time=gloda_db_update_time,
            index_db_update_time=index_db_update_time,
            force_create=force_create)
        ixs.needs_update = not index_up_to_date or force_create
        marker = "❌ " if ixs.needs_update else "✅"
        ixs.state_msg = f"""{marker} {self.user} update times:
Index db: {ixs.index_db_update_time} 
   Gloda: {ixs.gloda_db_update_time}
"""
        return ixs

get_mailboxes(progress_bar=None, restore_toc=False)

Create a dict of Thunderbird mailboxes.

Source code in thunderbird/mail.py
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
def get_mailboxes(self, progress_bar=None, restore_toc: bool = False):
    """
    Create a dict of Thunderbird mailboxes.

    """
    extensions = {"Folder": ".sbd", "Mailbox": ""}
    file_selector = FileSelector(
        path=self.local_folders, extensions=extensions, create_ui=False
    )
    if progress_bar is not None:
        progress_bar.total = file_selector.file_count
    mailboxes = {}  # Dictionary to store ThunderbirdMailbox instances
    self.errors=[]
    self._traverse_tree(file_selector.tree_structure, mailboxes, progress_bar, restore_toc)
    return mailboxes

get_mailboxes_by_relative_path()

Retrieves all mailboxes and returns a dictionary keyed by their relative folder paths.

This method fetches all Thunderbird mailboxes and organizes them in a dictionary where the keys are the relative paths of the mailboxes, providing a quick way to access a mailbox by its relative path.

Returns:

Type Description
Dict[str, ThunderbirdMailbox]

Dict[str, ThunderbirdMailbox]: A dictionary where the keys are relative folder paths and the values are ThunderbirdMailbox objects representing the corresponding mailboxes.

Source code in thunderbird/mail.py
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
def get_mailboxes_by_relative_path(self) -> Dict[str, "ThunderbirdMailbox"]:
    """
    Retrieves all mailboxes and returns a dictionary keyed by their relative folder paths.

    This method fetches all Thunderbird mailboxes and organizes them in a dictionary where the keys are the
    relative paths of the mailboxes, providing a quick way to access a mailbox by its relative path.

    Returns:
        Dict[str, ThunderbirdMailbox]: A dictionary where the keys are relative folder paths and the values are
                                       ThunderbirdMailbox objects representing the corresponding mailboxes.
    """
    mailboxes_dict = self.get_mailboxes()
    mailboxes_by_relative_path = {}
    for mailbox in mailboxes_dict.values():
        mailboxes_by_relative_path[mailbox.relative_folder_path] = mailbox
    return mailboxes_by_relative_path

get_mailboxes_dod_from_sqldb(sql_db)

Retrieve the mailbox list of dictionaries (LoD) from the given SQL database, and return it as a dictionary keyed by relative_folder_path.

Parameters:

Name Type Description Default
sql_db SQLDB

An instance of SQLDB connected to the SQLite database.

required

Returns:

Name Type Description
dict dict

A dictionary of mailbox dictionaries, keyed by relative_folder_path.

Source code in thunderbird/mail.py
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
def get_mailboxes_dod_from_sqldb(self, sql_db: SQLDB) -> dict:
    """
    Retrieve the mailbox list of dictionaries (LoD) from the given SQL database,
    and return it as a dictionary keyed by relative_folder_path.

    Args:
        sql_db (SQLDB): An instance of SQLDB connected to the SQLite database.

    Returns:
        dict: A dictionary of mailbox dictionaries, keyed by relative_folder_path.
    """
    sql_query = """SELECT *
                   FROM mailboxes 
                   ORDER BY folder_update_time DESC"""
    try:
        mailboxes_lod = sql_db.query(sql_query)
        mailboxes_dict = {mb["relative_folder_path"]: mb for mb in mailboxes_lod}
        return mailboxes_dict
    except sqlite3.OperationalError as e:
        if "no such table" in str(e):
            return {}
        else:
            raise e

get_profiles_path() classmethod

get the profile path

Source code in thunderbird/mail.py
683
684
685
686
687
688
689
690
691
692
693
@classmethod
def get_profiles_path(cls) -> str:
    """
    get the profile path
    """
    config_path = cls.get_config_path()
    # Ensure the config_path exists
    os.makedirs(config_path, exist_ok=True)

    profiles_path = os.path.join(config_path, "thunderbird.yaml")
    return profiles_path

get_synched_mailbox_view_lod()

Fetches and synchronizes mailbox data from the filesystem and SQL database and returns a unified view.

Returns:

Type Description

List[Dict[str, Any]]: A unified list of dictionaries, each representing a mailbox.

Source code in thunderbird/mail.py
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
def get_synched_mailbox_view_lod(self):
    """
    Fetches and synchronizes mailbox data from the filesystem and SQL database and returns a unified view.

    Returns:
        List[Dict[str, Any]]: A unified list of dictionaries, each representing a mailbox.
    """
    # Retrieve mailbox data from the filesystem
    fs_mboxes_dict = self.get_mailboxes_by_relative_path()

    # Retrieve mailbox data from the SQL database
    db_mboxes_dict = self.get_mailboxes_dod_from_sqldb(
        self.index_db
    )  # Database mailboxes

    # Merge and format the data for view
    mboxes_view_lod = self.to_view_lod(fs_mboxes_dict, db_mboxes_dict)

    return mboxes_view_lod

index_mailbox(mailbox, progress_bar, force_create)

Process a single mailbox for updating the index.

Parameters:

Name Type Description Default
mailbox ThunderbirdMailbox

The mailbox to be processed.

required
progress_bar Progressbar

Progress bar object for visual feedback.

required
force_create bool

Flag to force creation of a new index.

required

Returns:

Name Type Description
tuple tuple

A tuple containing the message count and any exception occurred.

Source code in thunderbird/mail.py
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
def index_mailbox(
    self,
    mailbox: "ThunderbirdMailbox",
    progress_bar: Progressbar,
    force_create: bool,
) -> tuple:
    """
    Process a single mailbox for updating the index.

    Args:
        mailbox (ThunderbirdMailbox): The mailbox to be processed.
        progress_bar (Progressbar): Progress bar object for visual feedback.
        force_create (bool): Flag to force creation of a new index.

    Returns:
        tuple: A tuple containing the message count and any exception occurred.
    """
    message_count = 0
    exception = None

    try:
        mbox_lod = mailbox.get_index_lod()
        message_count = len(mbox_lod)

        if message_count > 0:
            with_create = not self.index_db_exists() or force_create
            mbox_entity_info = self.index_db.createTable(
                mbox_lod,
                "mail_index",
                withCreate=with_create,
                withDrop=with_create,
            )
            # first delete existing index entries (if any)
            delete_cmd = f"DELETE FROM mail_index WHERE folder_path='{mailbox.relative_folder_path}'"
            self.index_db.execute(delete_cmd)
            # then store the new ones
            self.index_db.store(mbox_lod, mbox_entity_info, fixNone=True)

    except Exception as ex:
        exception = ex

    progress_bar.update(1)  # Update the progress bar after processing each mailbox
    return message_count, exception  # Single return statement

prepare_mailboxes_for_indexing(ixs, progress_bar=None, relative_paths=None)

Prepare a list of mailboxes for indexing by identifying which ones need to be updated.

This function iterates through all Thunderbird mailboxes, checking if each needs an update based on the last update time in the index database. It returns two dictionaries: one containing all mailboxes and another containing only the mailboxes that need updating.

Parameters:

Name Type Description Default
ixs IndexingState

IndexingState: the indexing state to work on

required
force_create bool

Flag to force creation of a new index for all mailboxes.

required
progress_bar Optional[Progressbar]

A progress bar instance for displaying the progress.

None
relative_paths Optional[List[str]]

A list of relative paths for specific mailboxes to update. If None, all mailboxes are considered.

None

Returns:

Type Description
Tuple[Dict[str, ThunderbirdMailbox], Dict[str, ThunderbirdMailbox]]

Tuple[Dict[str, ThunderbirdMailbox], Dict[str, ThunderbirdMailbox]]: A tuple containing two dictionaries. The first dictionary contains all mailboxes, and the second contains only mailboxes that need updating.

Source code in thunderbird/mail.py
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
def prepare_mailboxes_for_indexing(
    self,
    ixs:IndexingState,
    progress_bar: Optional[Progressbar] = None,
    relative_paths: Optional[List[str]] = None,
) -> Tuple[Dict[str, "ThunderbirdMailbox"], Dict[str, "ThunderbirdMailbox"]]:
    """
    Prepare a list of mailboxes for indexing by identifying which ones need to be updated.

    This function iterates through all Thunderbird mailboxes, checking if each needs an update
    based on the last update time in the index database. It returns two dictionaries: one containing
    all mailboxes and another containing only the mailboxes that need updating.

    Args:
        ixs:IndexingState: the indexing state to work on
        force_create (bool): Flag to force creation of a new index for all mailboxes.
        progress_bar (Optional[Progressbar]): A progress bar instance for displaying the progress.
        relative_paths (Optional[List[str]]): A list of relative paths for specific mailboxes to update. If None, all mailboxes are considered.

    Returns:
        Tuple[Dict[str, ThunderbirdMailbox], Dict[str, ThunderbirdMailbox]]: A tuple containing two dictionaries.
            The first dictionary contains all mailboxes, and the second contains only mailboxes that need updating.
    """

    # optionally Retrieve all mailboxes
    if not relative_paths:
        ixs.all_mailboxes = self.get_mailboxes(progress_bar)
    else:
        ixs.all_mailboxes = {}
        for relative_path in relative_paths:
            mailbox_path = f"{self.profile}/Mail/Local Folders{relative_path}"
            mailbox = ThunderbirdMailbox(self, mailbox_path)
            ixs.all_mailboxes[mailbox_path] = mailbox
    # Retrieve the current state of mailboxes from the index database, if not forcing creation
    mailboxes_update_dod = {}
    if not ixs.force_create:
        mailboxes_update_dod = self.get_mailboxes_dod_from_sqldb(self.index_db)

    # List to hold mailboxes that need updating
    ixs.mailboxes_to_update = {}

    # Iterate through each mailbox to check if it needs updating
    if ixs.force_create:
        # If force_create is True, add all mailboxes to the update list
        for mailbox in ixs.all_mailboxes.values():
            ixs.mailboxes_to_update[mailbox.relative_folder_path] = mailbox
    else:
        if relative_paths is not None:
            for mailbox in ixs.all_mailboxes.values():
                if mailbox.relative_folder_path in relative_paths:
                    ixs.mailboxes_to_update[mailbox.relative_folder_path] = mailbox
        else:
            for mailbox in ixs.all_mailboxes.values():
                # Check update times only if not forcing creation
                mailbox_info = mailboxes_update_dod.get(
                    mailbox.relative_folder_path, {}
                )
                _prev_mailbox_update_time = mailbox_info.get("folder_update_time")
                current_folder_update_time = mailbox.folder_update_time

                # Check if the mailbox needs updating
                if (
                    self.index_db_update_time is None
                ) or current_folder_update_time > self.index_db_update_time:
                    ixs.mailboxes_to_update[mailbox.relative_folder_path] = mailbox
    ixs.total_mailboxes = len(ixs.mailboxes_to_update)
    if progress_bar:
        progress_bar.total = ixs.total_mailboxes
        progress_bar.reset()

query(sql_query, params)

query this mailbox gloda

Parameters:

Name Type Description Default
sql_query(str)

the sql query to execute

required
params

the parameters for the query

required
Source code in thunderbird/mail.py
712
713
714
715
716
717
718
719
720
721
def query(self, sql_query: str, params):
    """
    query this mailbox gloda

    Args:
        sql_query(str): the sql query to execute
        params: the parameters for the query
    """
    records = self.sqlDB.query(sql_query, params)
    return records

to_view_lod(fs_mailboxes_dict, db_mailboxes_dict, force_count=False)

Merges mailbox information from the filesystem and SQL database into a unified view.

Parameters:

Name Type Description Default
fs_mailboxes_dict Dict[str, ThunderbirdMailbox]

Mailboxes from the filesystem.

required
db_mailboxes_dict Dict[str, Any]

Mailboxes from the SQL database.

required
force_count(bool)

if True get count from mailboxes (costly!)

required

Returns: List[Dict[str, Any]]: A unified list of dictionaries, each representing a mailbox.

Source code in thunderbird/mail.py
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
def to_view_lod(
    self,
    fs_mailboxes_dict: Dict[str, "ThunderbirdMailbox"],
    db_mailboxes_dict: Dict[str, Any],
    force_count: bool = False,
) -> List[Dict[str, Any]]:
    """
    Merges mailbox information from the filesystem and SQL database into a unified view.

    Args:
        fs_mailboxes_dict (Dict[str, ThunderbirdMailbox]): Mailboxes from the filesystem.
        db_mailboxes_dict (Dict[str, Any]): Mailboxes from the SQL database.
        force_count(bool): if True get count from mailboxes (costly!)
    Returns:
        List[Dict[str, Any]]: A unified list of dictionaries, each representing a mailbox.
    """
    merged_view_lod = []
    all_keys = set(fs_mailboxes_dict.keys()) | set(db_mailboxes_dict.keys())
    unknown = "❓"
    disk_symbol = "💾"  # Symbol representing the filesystem
    database_symbol = "🗄️"  # Symbol representing the database

    for key in all_keys:
        fs_mailbox = fs_mailboxes_dict.get(key)
        db_mailbox = db_mailboxes_dict.get(key)

        state_char = (
            "🔄"
            if fs_mailbox and db_mailbox
            else disk_symbol
            if fs_mailbox
            else database_symbol
        )
        if db_mailbox and "message_count" in db_mailbox:
            count_str = str(db_mailbox["message_count"])
        elif fs_mailbox and force_count:
            count_str = (
                str(len(fs_mailbox.mbox)) if hasattr(fs_mailbox, "mbox") else "⚠️❓"
            )
        else:
            count_str = unknown
        relative_folder_path = (
            fs_mailbox.relative_folder_path
            if fs_mailbox
            else db_mailbox["relative_folder_path"]
        )
        folder_url = (
            f"/folder/{self.user}{relative_folder_path}" if fs_mailbox else "#"
        )
        error_str = (
            fs_mailbox.error if fs_mailbox else db_mailbox.get("Error", unknown)
        )
        fs_update_time = fs_mailbox.folder_update_time if fs_mailbox else unknown
        db_update_time = db_mailbox["folder_update_time"] if db_mailbox else unknown

        mailbox_record = {
            "State": state_char,
            "Folder": Link.create(folder_url, relative_folder_path),
            f"{disk_symbol}-Updated": fs_update_time,
            f"{database_symbol}-Updated": db_update_time,
            "Count": count_str,
            "Error": error_str,
        }
        merged_view_lod.append(mailbox_record)

    # Sorting by 'Updated' field
    merged_view_lod.sort(key=lambda x: x[f"{disk_symbol}-Updated"], reverse=True)

    # Assigning index after sorting
    for index, record in enumerate(merged_view_lod):
        merged_view_lod[index] = {"#": index, **record}

    return merged_view_lod

ThunderbirdMailbox

mailbox wrapper

Source code in thunderbird/mail.py
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
class ThunderbirdMailbox:
    """
    mailbox wrapper
    """

    def __init__(
        self,
        tb: Thunderbird,
        folder_path: str,
        use_relative_path: bool = False,
        restore_toc: bool = True,
        debug: bool = False,
    ):
        """
        Initializes a new Mailbox object associated with a specific Thunderbird email client and mailbox folder.

        Args:
            tb (Thunderbird): An instance of the Thunderbird class representing the email client to which this mailbox belongs.
            folder_path (str): The file system path to the mailbox folder.
            use_relative_path (bool): If True, use a relative path for the mailbox folder. Default is False.
            restore_toc(bool): If True restore the table of contents
            debug (bool, optional): A flag for enabling debug mode. Default is False.

        The constructor sets the Thunderbird instance, folder path, and debug flag. It checks if the provided folder_path
        is a valid file and raises a ValueError if it does not exist. The method also handles the extraction of
        the relative folder path from the provided folder_path, especially handling the case where "Mail/Local Folders"
        is part of the path. Finally, it initializes the mailbox using the `mailbox.mbox` method.

        Raises:
            ValueError: If the provided folder_path does not correspond to an existing file.
        """
        self.tb = tb
        # Convert relative path to absolute path if use_relative_path is True
        if use_relative_path:
            folder_path = os.path.join(tb.profile, "Mail/Local Folders", folder_path)

        self.folder_path = folder_path

        self.debug = debug
        self.error = ""
        if not os.path.isfile(folder_path):
            msg = f"{folder_path} does not exist"
            raise ValueError(msg)
        self.folder_update_time = self.tb._get_file_update_time(self.folder_path)
        self.relative_folder_path = ThunderbirdMailbox.as_relative_path(folder_path)
        self.mbox = mailbox.mbox(folder_path)
        if restore_toc and tb.index_db_exists():
            self.restore_toc_from_sqldb(tb.index_db)

    def to_dict(self) -> Dict[str, Any]:
        """
        Converts the ThunderbirdMailbox data to a dictionary for SQL database storage.

        Returns:
            Dict[str, Any]: The dictionary representation of the ThunderbirdMailbox.
        """
        message_count = len(self.mbox)  # Assuming self.mbox is a mailbox object
        return {
            "folder_path": self.folder_path,
            "relative_folder_path": self.relative_folder_path,
            "folder_update_time": self.folder_update_time,
            "message_count": message_count,
            "error": str(self.error),
        }

    def as_view_record(self,index:int):
        """
        return me as dict to view in a list of dicts grid
        """
        return {
            "#": index,
            "path": self.relative_folder_path,
            "folder_update_time": self.folder_update_time,
            "error": str(self.error),
        }

    @staticmethod
    def as_relative_path(folder_path: str) -> str:
        """
        convert the folder_path to a relative path

        Args:
            folder_path(str): the folder path to  convert

        Returns:
            str: the relative path
        """
        # Check if "Mail/Local Folders" is in the folder path and split accordingly
        if "Mail/Local Folders" in folder_path:
            relative_folder_path = folder_path.split("Mail/Local Folders")[-1]
        else:
            # If the specific string is not found, use the entire folder_path or handle as needed
            relative_folder_path = folder_path
        return relative_folder_path

    def restore_toc_from_sqldb(self, sql_db: SQLDB) -> None:
        """
        Restore the table of contents from the given SQL database.

        This method fetches the TOC data from the SQLite database and uses it to rebuild the TOC in the mailbox.

        Args:
            sql_db (SQLDB): An instance of SQLDB connected to the SQLite database.
        """
        index_lod = self.get_toc_lod_from_sqldb(sql_db)
        self.restore_toc_from_lod(index_lod)

    def get_toc_lod_from_sqldb(self, sql_db: SQLDB) -> list:
        """
        Retrieve the index list of dictionaries (LoD) representing the TOC from the given SQL database.

        This method performs a SQL query to fetch the TOC information, which includes the email index, start position,
        and stop position for each email in the mailbox corresponding to the current folder path.

        Args:
            sql_db (SQLDB): An instance of SQLDB connected to the SQLite database.

        Returns:
            list: A list of dictionaries, each containing the index and TOC information for an email.
        """
        sql_query = """SELECT *
FROM mail_index 
WHERE folder_path = ?
ORDER BY email_index"""
        folder_path_param = (self.relative_folder_path,)
        index_lod = sql_db.query(sql_query, folder_path_param)
        return index_lod

    @classmethod
    def to_view_lod(
        cls, index_lod: List[Dict[str, Any]], user: str
    ) -> List[Dict[str, Any]]:
        """
        Converts a list of index record dictionaries into a format suitable for display in an ag-grid.
        It renames and repositions the 'email_index' key, removes 'start_pos' and 'stop_pos', and converts
        'message_id' to a hyperlink using a custom Link.create() function.

        Args:
            index_lod (List[Dict[str, Any]]): A list of dictionaries representing the index records.
            user (str): The user identifier to be used in constructing URLs for hyperlinks.

        Returns:
            List[Dict[str, Any]]: The list of modified index record dictionaries.
        """
        for record in index_lod:
            # HTML-encode potentially unsafe fields
            for key in record:
                if isinstance(record[key], str):
                    record[key] = html.escape(record[key])

            # Renaming and moving 'email_index' to the first position as '#'
            record["#"] = record.pop("email_index") + 1

            # Removing 'start_pos','stop_pos' and 'folder_path'
            record.pop("start_pos", None)
            record.pop("stop_pos", None)
            record.pop("folder_path", None)

            # Converting 'message_id' to a hyperlink
            mail_id = record["message_id"]
            normalized_mail_id = Mail.normalize_mailid(mail_id)
            url = f"/mail/{user}/{normalized_mail_id}"
            record["message_id"] = Link.create(url, text=normalized_mail_id)

        # Reordering keys to ensure '#' is first
        sorted_index_lod = [
            {k: record[k] for k in sorted(record, key=lambda x: x != "#")}
            for record in index_lod
        ]
        return sorted_index_lod

    def restore_toc_from_lod(self, index_lod: list) -> None:
        """
        Restores the table of contents of the mailbox using records from an SQLite database.

        This method iterates over a list of dictionaries where each dictionary contains details about an email,
        including its positions in the mailbox file. It uses this information to reconstruct the mailbox's TOC.

        Args:
            index_lod (list of dict): A list of records from the SQLite database. Each record is a dictionary
                                      containing details about an email, including its positions in the mailbox file.
        """
        # Reinitialize the mailbox's TOC structure
        self.mbox._toc = {}

        # Iterate over the index records to rebuild the TOC
        for record in index_lod:
            idx = record["email_index"]
            start_pos = record["start_pos"]
            stop_pos = record["stop_pos"]

            # Update the TOC with the new positions
            self.mbox._toc[idx] = (start_pos, stop_pos)

    def decode_subject(self, subject) -> str:
        # Decode the subject
        decoded_bytes = decode_header(subject)
        # Concatenate the decoded parts
        decoded_subject = "".join(
            str(text, charset or "utf-8") if isinstance(text, bytes) else text
            for text, charset in decoded_bytes
        )
        return decoded_subject

    def get_index_lod(self):
        """
        get the list of dicts for indexing
        """
        lod = []
        for idx, message in enumerate(self.mbox):
            start_pos, stop_pos = self.mbox._toc.get(idx, (None, None))
            error_msg = ""  # Variable to store potential error messages
            decoded_subject = "?"
            msg_date, msg_iso_date, error_msg = Mail.get_iso_date(message)
            try:
                # Decode the subject
                decoded_subject = self.decode_subject(message.get("Subject", "?"))
            except Exception as e:
                error_msg = f"{str(e)}"

            record = {
                "folder_path": self.relative_folder_path,
                "message_id": message.get(
                    "Message-ID", f"{self.relative_folder_path}#{idx}"
                ),
                "sender": str(message.get("From", "?")),
                "recipient": str(message.get("To", "?")),
                "subject": decoded_subject,
                "date": msg_date,
                "iso_date": msg_iso_date,
                "email_index": idx,
                "start_pos": start_pos,
                "stop_pos": stop_pos,
                "error": error_msg,  # Add the error message if any
            }
            lod.append(record)

        return lod

    def get_message_by_key(self, messageKey: int) -> Message:
        """
        Retrieves the email message by its message key.

        This method fetches an email message based on its unique message key from the mbox mailbox file. It uses the
        `messageKey` to index into the mbox file and retrieve the specific message. The method profiles the time taken
        to fetch the message using the `Profiler` utility class for performance monitoring.

        Args:
            messageKey (int): The unique key (index) of the email message to be retrieved.

        Returns:
            Message: The email message object corresponding to the specified message key.

        Note:
            The `messageKey` is assumed to be 1-based when passed to this function, but the `mailbox.mbox` class uses
            0-based indexing, so 1 is subtracted from `messageKey` for internal use.
        """
        getTime = Profiler(
            f"mbox.get {messageKey-1} from {self.folder_path}", profile=self.debug
        )
        msg = self.mbox.get(messageKey - 1)
        getTime.time()
        return msg

    def get_message_by_pos(self, start_pos: int, stop_pos: int) -> Optional[Message]:
        """
        Fetches an email message by its start and stop byte positions in the mailbox file
        and parses it into an email.message.Message object.

        Args:
            start_pos (int): The starting byte position of the message in the mailbox file.
            stop_pos (int): The stopping byte position of the message in the mailbox file.

        Returns:
            Message: The email message object parsed from the specified byte range,
        Raises:
            FileNotFoundError: If the mailbox file does not exist.
            IOError: If an error occurs during file opening or reading.
            ValueError: If the byte range does not represent a valid email message.

        """
        with open(self.folder_path, 'rb') as mbox_file:
            mbox_file.seek(start_pos)  # Move to the start position
            content = mbox_file.read(stop_pos - start_pos)  # Read the specified range

            # Parse the content into an email.message.Message object
            msg = message_from_bytes(content)
            return msg

    def search_message_by_key(self, mailid: str):
        """
        search messages by key
        """
        msg = None
        searchId = f"<{mailid}>"
        searchTime = Profiler(
            f"keySearch {searchId} after mbox.get failed", profile=self.debug
        )
        for key in self.mbox.keys():
            keyMsg = self.mbox.get(key)
            msgId = keyMsg.get("Message-Id")
            if msgId == searchId:
                msg = keyMsg
                break
            pass
        searchTime.time()
        return msg

    def close(self):
        """
        close the mailbox
        """
        self.mbox.close()

__init__(tb, folder_path, use_relative_path=False, restore_toc=True, debug=False)

Initializes a new Mailbox object associated with a specific Thunderbird email client and mailbox folder.

Parameters:

Name Type Description Default
tb Thunderbird

An instance of the Thunderbird class representing the email client to which this mailbox belongs.

required
folder_path str

The file system path to the mailbox folder.

required
use_relative_path bool

If True, use a relative path for the mailbox folder. Default is False.

False
restore_toc(bool)

If True restore the table of contents

required
debug bool

A flag for enabling debug mode. Default is False.

False

The constructor sets the Thunderbird instance, folder path, and debug flag. It checks if the provided folder_path is a valid file and raises a ValueError if it does not exist. The method also handles the extraction of the relative folder path from the provided folder_path, especially handling the case where "Mail/Local Folders" is part of the path. Finally, it initializes the mailbox using the mailbox.mbox method.

Raises:

Type Description
ValueError

If the provided folder_path does not correspond to an existing file.

Source code in thunderbird/mail.py
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
def __init__(
    self,
    tb: Thunderbird,
    folder_path: str,
    use_relative_path: bool = False,
    restore_toc: bool = True,
    debug: bool = False,
):
    """
    Initializes a new Mailbox object associated with a specific Thunderbird email client and mailbox folder.

    Args:
        tb (Thunderbird): An instance of the Thunderbird class representing the email client to which this mailbox belongs.
        folder_path (str): The file system path to the mailbox folder.
        use_relative_path (bool): If True, use a relative path for the mailbox folder. Default is False.
        restore_toc(bool): If True restore the table of contents
        debug (bool, optional): A flag for enabling debug mode. Default is False.

    The constructor sets the Thunderbird instance, folder path, and debug flag. It checks if the provided folder_path
    is a valid file and raises a ValueError if it does not exist. The method also handles the extraction of
    the relative folder path from the provided folder_path, especially handling the case where "Mail/Local Folders"
    is part of the path. Finally, it initializes the mailbox using the `mailbox.mbox` method.

    Raises:
        ValueError: If the provided folder_path does not correspond to an existing file.
    """
    self.tb = tb
    # Convert relative path to absolute path if use_relative_path is True
    if use_relative_path:
        folder_path = os.path.join(tb.profile, "Mail/Local Folders", folder_path)

    self.folder_path = folder_path

    self.debug = debug
    self.error = ""
    if not os.path.isfile(folder_path):
        msg = f"{folder_path} does not exist"
        raise ValueError(msg)
    self.folder_update_time = self.tb._get_file_update_time(self.folder_path)
    self.relative_folder_path = ThunderbirdMailbox.as_relative_path(folder_path)
    self.mbox = mailbox.mbox(folder_path)
    if restore_toc and tb.index_db_exists():
        self.restore_toc_from_sqldb(tb.index_db)

as_relative_path(folder_path) staticmethod

convert the folder_path to a relative path

Parameters:

Name Type Description Default
folder_path(str)

the folder path to convert

required

Returns:

Name Type Description
str str

the relative path

Source code in thunderbird/mail.py
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
@staticmethod
def as_relative_path(folder_path: str) -> str:
    """
    convert the folder_path to a relative path

    Args:
        folder_path(str): the folder path to  convert

    Returns:
        str: the relative path
    """
    # Check if "Mail/Local Folders" is in the folder path and split accordingly
    if "Mail/Local Folders" in folder_path:
        relative_folder_path = folder_path.split("Mail/Local Folders")[-1]
    else:
        # If the specific string is not found, use the entire folder_path or handle as needed
        relative_folder_path = folder_path
    return relative_folder_path

as_view_record(index)

return me as dict to view in a list of dicts grid

Source code in thunderbird/mail.py
841
842
843
844
845
846
847
848
849
850
def as_view_record(self,index:int):
    """
    return me as dict to view in a list of dicts grid
    """
    return {
        "#": index,
        "path": self.relative_folder_path,
        "folder_update_time": self.folder_update_time,
        "error": str(self.error),
    }

close()

close the mailbox

Source code in thunderbird/mail.py
1084
1085
1086
1087
1088
def close(self):
    """
    close the mailbox
    """
    self.mbox.close()

get_index_lod()

get the list of dicts for indexing

Source code in thunderbird/mail.py
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
def get_index_lod(self):
    """
    get the list of dicts for indexing
    """
    lod = []
    for idx, message in enumerate(self.mbox):
        start_pos, stop_pos = self.mbox._toc.get(idx, (None, None))
        error_msg = ""  # Variable to store potential error messages
        decoded_subject = "?"
        msg_date, msg_iso_date, error_msg = Mail.get_iso_date(message)
        try:
            # Decode the subject
            decoded_subject = self.decode_subject(message.get("Subject", "?"))
        except Exception as e:
            error_msg = f"{str(e)}"

        record = {
            "folder_path": self.relative_folder_path,
            "message_id": message.get(
                "Message-ID", f"{self.relative_folder_path}#{idx}"
            ),
            "sender": str(message.get("From", "?")),
            "recipient": str(message.get("To", "?")),
            "subject": decoded_subject,
            "date": msg_date,
            "iso_date": msg_iso_date,
            "email_index": idx,
            "start_pos": start_pos,
            "stop_pos": stop_pos,
            "error": error_msg,  # Add the error message if any
        }
        lod.append(record)

    return lod

get_message_by_key(messageKey)

Retrieves the email message by its message key.

This method fetches an email message based on its unique message key from the mbox mailbox file. It uses the messageKey to index into the mbox file and retrieve the specific message. The method profiles the time taken to fetch the message using the Profiler utility class for performance monitoring.

Parameters:

Name Type Description Default
messageKey int

The unique key (index) of the email message to be retrieved.

required

Returns:

Name Type Description
Message Message

The email message object corresponding to the specified message key.

Note

The messageKey is assumed to be 1-based when passed to this function, but the mailbox.mbox class uses 0-based indexing, so 1 is subtracted from messageKey for internal use.

Source code in thunderbird/mail.py
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
def get_message_by_key(self, messageKey: int) -> Message:
    """
    Retrieves the email message by its message key.

    This method fetches an email message based on its unique message key from the mbox mailbox file. It uses the
    `messageKey` to index into the mbox file and retrieve the specific message. The method profiles the time taken
    to fetch the message using the `Profiler` utility class for performance monitoring.

    Args:
        messageKey (int): The unique key (index) of the email message to be retrieved.

    Returns:
        Message: The email message object corresponding to the specified message key.

    Note:
        The `messageKey` is assumed to be 1-based when passed to this function, but the `mailbox.mbox` class uses
        0-based indexing, so 1 is subtracted from `messageKey` for internal use.
    """
    getTime = Profiler(
        f"mbox.get {messageKey-1} from {self.folder_path}", profile=self.debug
    )
    msg = self.mbox.get(messageKey - 1)
    getTime.time()
    return msg

get_message_by_pos(start_pos, stop_pos)

Fetches an email message by its start and stop byte positions in the mailbox file and parses it into an email.message.Message object.

Parameters:

Name Type Description Default
start_pos int

The starting byte position of the message in the mailbox file.

required
stop_pos int

The stopping byte position of the message in the mailbox file.

required

Returns:

Name Type Description
Message Optional[Message]

The email message object parsed from the specified byte range,

Raises: FileNotFoundError: If the mailbox file does not exist. IOError: If an error occurs during file opening or reading. ValueError: If the byte range does not represent a valid email message.

Source code in thunderbird/mail.py
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
def get_message_by_pos(self, start_pos: int, stop_pos: int) -> Optional[Message]:
    """
    Fetches an email message by its start and stop byte positions in the mailbox file
    and parses it into an email.message.Message object.

    Args:
        start_pos (int): The starting byte position of the message in the mailbox file.
        stop_pos (int): The stopping byte position of the message in the mailbox file.

    Returns:
        Message: The email message object parsed from the specified byte range,
    Raises:
        FileNotFoundError: If the mailbox file does not exist.
        IOError: If an error occurs during file opening or reading.
        ValueError: If the byte range does not represent a valid email message.

    """
    with open(self.folder_path, 'rb') as mbox_file:
        mbox_file.seek(start_pos)  # Move to the start position
        content = mbox_file.read(stop_pos - start_pos)  # Read the specified range

        # Parse the content into an email.message.Message object
        msg = message_from_bytes(content)
        return msg

get_toc_lod_from_sqldb(sql_db)

Retrieve the index list of dictionaries (LoD) representing the TOC from the given SQL database.

This method performs a SQL query to fetch the TOC information, which includes the email index, start position, and stop position for each email in the mailbox corresponding to the current folder path.

Parameters:

Name Type Description Default
sql_db SQLDB

An instance of SQLDB connected to the SQLite database.

required

Returns:

Name Type Description
list list

A list of dictionaries, each containing the index and TOC information for an email.

Source code in thunderbird/mail.py
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
    def get_toc_lod_from_sqldb(self, sql_db: SQLDB) -> list:
        """
        Retrieve the index list of dictionaries (LoD) representing the TOC from the given SQL database.

        This method performs a SQL query to fetch the TOC information, which includes the email index, start position,
        and stop position for each email in the mailbox corresponding to the current folder path.

        Args:
            sql_db (SQLDB): An instance of SQLDB connected to the SQLite database.

        Returns:
            list: A list of dictionaries, each containing the index and TOC information for an email.
        """
        sql_query = """SELECT *
FROM mail_index 
WHERE folder_path = ?
ORDER BY email_index"""
        folder_path_param = (self.relative_folder_path,)
        index_lod = sql_db.query(sql_query, folder_path_param)
        return index_lod

restore_toc_from_lod(index_lod)

Restores the table of contents of the mailbox using records from an SQLite database.

This method iterates over a list of dictionaries where each dictionary contains details about an email, including its positions in the mailbox file. It uses this information to reconstruct the mailbox's TOC.

Parameters:

Name Type Description Default
index_lod list of dict

A list of records from the SQLite database. Each record is a dictionary containing details about an email, including its positions in the mailbox file.

required
Source code in thunderbird/mail.py
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
def restore_toc_from_lod(self, index_lod: list) -> None:
    """
    Restores the table of contents of the mailbox using records from an SQLite database.

    This method iterates over a list of dictionaries where each dictionary contains details about an email,
    including its positions in the mailbox file. It uses this information to reconstruct the mailbox's TOC.

    Args:
        index_lod (list of dict): A list of records from the SQLite database. Each record is a dictionary
                                  containing details about an email, including its positions in the mailbox file.
    """
    # Reinitialize the mailbox's TOC structure
    self.mbox._toc = {}

    # Iterate over the index records to rebuild the TOC
    for record in index_lod:
        idx = record["email_index"]
        start_pos = record["start_pos"]
        stop_pos = record["stop_pos"]

        # Update the TOC with the new positions
        self.mbox._toc[idx] = (start_pos, stop_pos)

restore_toc_from_sqldb(sql_db)

Restore the table of contents from the given SQL database.

This method fetches the TOC data from the SQLite database and uses it to rebuild the TOC in the mailbox.

Parameters:

Name Type Description Default
sql_db SQLDB

An instance of SQLDB connected to the SQLite database.

required
Source code in thunderbird/mail.py
871
872
873
874
875
876
877
878
879
880
881
def restore_toc_from_sqldb(self, sql_db: SQLDB) -> None:
    """
    Restore the table of contents from the given SQL database.

    This method fetches the TOC data from the SQLite database and uses it to rebuild the TOC in the mailbox.

    Args:
        sql_db (SQLDB): An instance of SQLDB connected to the SQLite database.
    """
    index_lod = self.get_toc_lod_from_sqldb(sql_db)
    self.restore_toc_from_lod(index_lod)

search_message_by_key(mailid)

search messages by key

Source code in thunderbird/mail.py
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
def search_message_by_key(self, mailid: str):
    """
    search messages by key
    """
    msg = None
    searchId = f"<{mailid}>"
    searchTime = Profiler(
        f"keySearch {searchId} after mbox.get failed", profile=self.debug
    )
    for key in self.mbox.keys():
        keyMsg = self.mbox.get(key)
        msgId = keyMsg.get("Message-Id")
        if msgId == searchId:
            msg = keyMsg
            break
        pass
    searchTime.time()
    return msg

to_dict()

Converts the ThunderbirdMailbox data to a dictionary for SQL database storage.

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: The dictionary representation of the ThunderbirdMailbox.

Source code in thunderbird/mail.py
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
def to_dict(self) -> Dict[str, Any]:
    """
    Converts the ThunderbirdMailbox data to a dictionary for SQL database storage.

    Returns:
        Dict[str, Any]: The dictionary representation of the ThunderbirdMailbox.
    """
    message_count = len(self.mbox)  # Assuming self.mbox is a mailbox object
    return {
        "folder_path": self.folder_path,
        "relative_folder_path": self.relative_folder_path,
        "folder_update_time": self.folder_update_time,
        "message_count": message_count,
        "error": str(self.error),
    }

to_view_lod(index_lod, user) classmethod

Converts a list of index record dictionaries into a format suitable for display in an ag-grid. It renames and repositions the 'email_index' key, removes 'start_pos' and 'stop_pos', and converts 'message_id' to a hyperlink using a custom Link.create() function.

Parameters:

Name Type Description Default
index_lod List[Dict[str, Any]]

A list of dictionaries representing the index records.

required
user str

The user identifier to be used in constructing URLs for hyperlinks.

required

Returns:

Type Description
List[Dict[str, Any]]

List[Dict[str, Any]]: The list of modified index record dictionaries.

Source code in thunderbird/mail.py
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
@classmethod
def to_view_lod(
    cls, index_lod: List[Dict[str, Any]], user: str
) -> List[Dict[str, Any]]:
    """
    Converts a list of index record dictionaries into a format suitable for display in an ag-grid.
    It renames and repositions the 'email_index' key, removes 'start_pos' and 'stop_pos', and converts
    'message_id' to a hyperlink using a custom Link.create() function.

    Args:
        index_lod (List[Dict[str, Any]]): A list of dictionaries representing the index records.
        user (str): The user identifier to be used in constructing URLs for hyperlinks.

    Returns:
        List[Dict[str, Any]]: The list of modified index record dictionaries.
    """
    for record in index_lod:
        # HTML-encode potentially unsafe fields
        for key in record:
            if isinstance(record[key], str):
                record[key] = html.escape(record[key])

        # Renaming and moving 'email_index' to the first position as '#'
        record["#"] = record.pop("email_index") + 1

        # Removing 'start_pos','stop_pos' and 'folder_path'
        record.pop("start_pos", None)
        record.pop("stop_pos", None)
        record.pop("folder_path", None)

        # Converting 'message_id' to a hyperlink
        mail_id = record["message_id"]
        normalized_mail_id = Mail.normalize_mailid(mail_id)
        url = f"/mail/{user}/{normalized_mail_id}"
        record["message_id"] = Link.create(url, text=normalized_mail_id)

    # Reordering keys to ensure '#' is first
    sorted_index_lod = [
        {k: record[k] for k in sorted(record, key=lambda x: x != "#")}
        for record in index_lod
    ]
    return sorted_index_lod

mail_cmd

Created on 2023-11-23

@author: wf

ThunderbirdMailCmd

Bases: WebserverCmd

command line access to pyThunderbird

Source code in thunderbird/mail_cmd.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
class ThunderbirdMailCmd(WebserverCmd):
    """
    command line access to pyThunderbird
    """

    def getArgParser(self, description: str, version_msg) -> ArgumentParser:
        """
        override the default argparser call
        """
        parser = super().getArgParser(description, version_msg)
        parser.add_argument("-u", "--user", type=str, help="id of the user")
        parser.add_argument(
            "-m", "--mailid", type=str, help="id of the mail to retrieve"
        )
        parser.add_argument("-ul", "--user-list", default=[], nargs="+")
        parser.add_argument(
            "-ci",
            "--create-index",
            action="store_true",
            help="create an alternative index for the given users's Thunderbird mailarchive",
        )
        parser.add_argument(
            "-cil",
            "--create-index-list",
            default=[],
            nargs="+",
            help="create an alternative index for the given list of relative mailbox paths",
        )
        parser.add_argument(
            "-f",
            "--force",
            action="store_true",
            help="force the creation of a new index even if one already exists",
        )
        parser.add_argument(
            "-v",
            "--verbose",
            action="store_true",
            help="show verbose infos e.g. on startup [default: %(default)s]",
        )
        return parser

    def handle_args(self) -> bool:
        """
        Handles command line arguments.

        This method processes the command line arguments provided to the script.
        It checks for the presence of required arguments and initializes the Mail object
        if the necessary arguments are provided.

        Returns:
            bool: True if the arguments are processed successfully, False otherwise.
        """
        # Calling the superclass constructor or method, if needed
        super().handle_args()

        args = self.args

        # Check if both user and id arguments are provided
        if args.user is None:
            if args.mailid is None and not args.create_index:
                self.parser.print_help()
                return False
        elif args.create_index:
            tb = Thunderbird.get(args.user)
            indexing_state = tb.create_or_update_index(force_create=args.force)
            indexing_state.show_index_report(verbose=args.verbose)
        elif args.create_index_list:
            tb = Thunderbird.get(args.user)
            indexing_state = tb.create_or_update_index(
                force_create=args.force, relative_paths=args.create_index_list
            )
            indexing_state.show_index_report(verbose=args.verbose)
        elif args.mailid:
            # Creating a Mail object with the provided arguments
            mail = Mail(user=args.user, mailid=args.mailid, debug=args.debug)
            if mail.found:
                print(mail.msg)
                return True
            else:
                msg = f"mail with id {args.mailid} for user {args.user} not found"
                print(msg, files=sys.stderr)
                self.exit_code = 1

getArgParser(description, version_msg)

override the default argparser call

Source code in thunderbird/mail_cmd.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def getArgParser(self, description: str, version_msg) -> ArgumentParser:
    """
    override the default argparser call
    """
    parser = super().getArgParser(description, version_msg)
    parser.add_argument("-u", "--user", type=str, help="id of the user")
    parser.add_argument(
        "-m", "--mailid", type=str, help="id of the mail to retrieve"
    )
    parser.add_argument("-ul", "--user-list", default=[], nargs="+")
    parser.add_argument(
        "-ci",
        "--create-index",
        action="store_true",
        help="create an alternative index for the given users's Thunderbird mailarchive",
    )
    parser.add_argument(
        "-cil",
        "--create-index-list",
        default=[],
        nargs="+",
        help="create an alternative index for the given list of relative mailbox paths",
    )
    parser.add_argument(
        "-f",
        "--force",
        action="store_true",
        help="force the creation of a new index even if one already exists",
    )
    parser.add_argument(
        "-v",
        "--verbose",
        action="store_true",
        help="show verbose infos e.g. on startup [default: %(default)s]",
    )
    return parser

handle_args()

Handles command line arguments.

This method processes the command line arguments provided to the script. It checks for the presence of required arguments and initializes the Mail object if the necessary arguments are provided.

Returns:

Name Type Description
bool bool

True if the arguments are processed successfully, False otherwise.

Source code in thunderbird/mail_cmd.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def handle_args(self) -> bool:
    """
    Handles command line arguments.

    This method processes the command line arguments provided to the script.
    It checks for the presence of required arguments and initializes the Mail object
    if the necessary arguments are provided.

    Returns:
        bool: True if the arguments are processed successfully, False otherwise.
    """
    # Calling the superclass constructor or method, if needed
    super().handle_args()

    args = self.args

    # Check if both user and id arguments are provided
    if args.user is None:
        if args.mailid is None and not args.create_index:
            self.parser.print_help()
            return False
    elif args.create_index:
        tb = Thunderbird.get(args.user)
        indexing_state = tb.create_or_update_index(force_create=args.force)
        indexing_state.show_index_report(verbose=args.verbose)
    elif args.create_index_list:
        tb = Thunderbird.get(args.user)
        indexing_state = tb.create_or_update_index(
            force_create=args.force, relative_paths=args.create_index_list
        )
        indexing_state.show_index_report(verbose=args.verbose)
    elif args.mailid:
        # Creating a Mail object with the provided arguments
        mail = Mail(user=args.user, mailid=args.mailid, debug=args.debug)
        if mail.found:
            print(mail.msg)
            return True
        else:
            msg = f"mail with id {args.mailid} for user {args.user} not found"
            print(msg, files=sys.stderr)
            self.exit_code = 1

main(argv=None)

main call

Source code in thunderbird/mail_cmd.py
100
101
102
103
104
105
106
107
108
def main(argv: list = None):
    """
    main call
    """
    cmd = ThunderbirdMailCmd(
        config=ThunderbirdWebserver.get_config(), webserver_cls=ThunderbirdWebserver
    )
    exit_code = cmd.cmd_main(argv)
    return exit_code

profiler

Created on 2021-10-15

@author: wf

Profiler

simple profiler

Source code in thunderbird/profiler.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class Profiler:
    """
    simple profiler
    """

    def __init__(self, msg, profile=True):
        """
        construct me with the given msg and profile active flag

        Args:
            msg(str): the message to show if profiling is active
            profile(bool): True if messages should be shown
        """
        self.msg = msg
        self.profile = profile
        self.starttime = time.time()
        if profile:
            print(f"Starting {msg} ...")

    def time(self, extraMsg=""):
        """
        time the action and print if profile is active
        """
        elapsed = time.time() - self.starttime
        if self.profile:
            print(f"{self.msg}{extraMsg} took {elapsed:5.1f} s")
        return elapsed

__init__(msg, profile=True)

construct me with the given msg and profile active flag

Parameters:

Name Type Description Default
msg(str)

the message to show if profiling is active

required
profile(bool)

True if messages should be shown

required
Source code in thunderbird/profiler.py
14
15
16
17
18
19
20
21
22
23
24
25
26
def __init__(self, msg, profile=True):
    """
    construct me with the given msg and profile active flag

    Args:
        msg(str): the message to show if profiling is active
        profile(bool): True if messages should be shown
    """
    self.msg = msg
    self.profile = profile
    self.starttime = time.time()
    if profile:
        print(f"Starting {msg} ...")

time(extraMsg='')

time the action and print if profile is active

Source code in thunderbird/profiler.py
28
29
30
31
32
33
34
35
def time(self, extraMsg=""):
    """
    time the action and print if profile is active
    """
    elapsed = time.time() - self.starttime
    if self.profile:
        print(f"{self.msg}{extraMsg} took {elapsed:5.1f} s")
    return elapsed

search

Created on 2023-12-06

@author: wf

MailSearch

utility module to search Thunderbird mails https://github.com/WolfgangFahl/pyThunderbird/issues/15

Source code in thunderbird/search.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
class MailSearch:
    """
    utility module to search Thunderbird mails
    https://github.com/WolfgangFahl/pyThunderbird/issues/15
    """

    def __init__(
        self,
        webserver,
        tb,
        search_dict: dict,
        with_ui: bool = True,
        result_limit: int = 1000,
    ):
        """
        Constructor.

        Args:
            webserver: The webserver instance.
            tb: The Thunderbird instance.
            search_dict (dict): The dictionary containing search parameters.
            result_limit(int): maximum number of mails to be displayed
            with_ui (bool): If True, sets up the UI components.
        """
        self.webserver = webserver
        self.tb = tb
        self.result_limit = result_limit
        if with_ui:
            self.setup_form(search_dict)

    def setup_form(self, search_dict: dict):
        """
        Set up a search form.

        Args:
            search_dict (dict): The dictionary containing search parameters.
        """
        self.dict_edit = DictEdit(search_dict)
        self.dict_edit.expansion.open()
        self.search_button = (
            ui.button("search", icon="search", color="primary")
            .tooltip("search thunderbird mails")
            .on("click", handler=self.on_search)
        )
        self.search_summary = ui.html()
        self.search_results_grid = ListOfDictsGrid(lod=[])

    def construct_query(self, search_criteria: dict) -> (str, list):
        """
            Construct the SQL query based on the search criteria.

            Args:
                search_criteria (dict): The dictionary containing search parameters.

            Returns:
                tuple: A tuple containing the SQL query string and the list of parameters.

           The search is based on the `index_db` table structure:

        CREATE TABLE mail_index (
          folder_path TEXT,
          message_id TEXT,
          sender TEXT,
          recipient TEXT,
          subject TEXT,
          date TEXT,
          iso_date TEXT,
          email_index INTEGER,
          start_pos INTEGER,
          stop_pos INTEGER,
          error TEXT
        )
        """
        sql_query = "SELECT * FROM mail_index WHERE "
        query_conditions = []
        query_params = []

        # Mapping from search_dict keys to SQL table columns
        column_mappings = {
            "Subject": "subject",
            "From": "sender",
            "To": "recipient",
            "Message-ID:": "message_id",
        }

        for field, value in search_criteria.items():
            if value and column_mappings[field]:
                sql_column = column_mappings[field]
                query_conditions.append(f"{sql_column} LIKE ?")
                query_params.append(f"%{value}%")

        # Special handling for fields like "Content" or "Date" can be added here

        if not query_conditions:
            sql_query = "SELECT * FROM mail_index"
        else:
            sql_query += " AND ".join(query_conditions)

        return sql_query, query_params

    async def on_search(self, _event: GenericEventArguments):
        """
        Handle the search based on the search form criteria.

        Args:
            event (GenericEventArguments): Event arguments (unused in this method).
        """
        try:
            search_criteria = self.dict_edit.d
            sql_query, query_params = self.construct_query(search_criteria)
            search_results = self.tb.index_db.query(sql_query, query_params)
            result_count = len(search_results)
            msg = f"{result_count} messages found"
            if result_count > self.result_limit:
                msg = f"too many results: {result_count}>{self.result_limit}"
            self.search_summary.content = msg
            search_results = search_results[: self.result_limit]
            view_lod = ThunderbirdMailbox.to_view_lod(search_results, user=self.tb.user)

            self.search_results_grid.load_lod(view_lod)
            self.search_results_grid.update()
        except Exception as ex:
            self.webserver.handle_exception(ex)

__init__(webserver, tb, search_dict, with_ui=True, result_limit=1000)

Constructor.

Parameters:

Name Type Description Default
webserver

The webserver instance.

required
tb

The Thunderbird instance.

required
search_dict dict

The dictionary containing search parameters.

required
result_limit(int)

maximum number of mails to be displayed

required
with_ui bool

If True, sets up the UI components.

True
Source code in thunderbird/search.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def __init__(
    self,
    webserver,
    tb,
    search_dict: dict,
    with_ui: bool = True,
    result_limit: int = 1000,
):
    """
    Constructor.

    Args:
        webserver: The webserver instance.
        tb: The Thunderbird instance.
        search_dict (dict): The dictionary containing search parameters.
        result_limit(int): maximum number of mails to be displayed
        with_ui (bool): If True, sets up the UI components.
    """
    self.webserver = webserver
    self.tb = tb
    self.result_limit = result_limit
    if with_ui:
        self.setup_form(search_dict)

construct_query(search_criteria)

Construct the SQL query based on the search criteria.

Args:
    search_criteria (dict): The dictionary containing search parameters.

Returns:
    tuple: A tuple containing the SQL query string and the list of parameters.

The search is based on the index_db table structure:

CREATE TABLE mail_index ( folder_path TEXT, message_id TEXT, sender TEXT, recipient TEXT, subject TEXT, date TEXT, iso_date TEXT, email_index INTEGER, start_pos INTEGER, stop_pos INTEGER, error TEXT )

Source code in thunderbird/search.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def construct_query(self, search_criteria: dict) -> (str, list):
    """
        Construct the SQL query based on the search criteria.

        Args:
            search_criteria (dict): The dictionary containing search parameters.

        Returns:
            tuple: A tuple containing the SQL query string and the list of parameters.

       The search is based on the `index_db` table structure:

    CREATE TABLE mail_index (
      folder_path TEXT,
      message_id TEXT,
      sender TEXT,
      recipient TEXT,
      subject TEXT,
      date TEXT,
      iso_date TEXT,
      email_index INTEGER,
      start_pos INTEGER,
      stop_pos INTEGER,
      error TEXT
    )
    """
    sql_query = "SELECT * FROM mail_index WHERE "
    query_conditions = []
    query_params = []

    # Mapping from search_dict keys to SQL table columns
    column_mappings = {
        "Subject": "subject",
        "From": "sender",
        "To": "recipient",
        "Message-ID:": "message_id",
    }

    for field, value in search_criteria.items():
        if value and column_mappings[field]:
            sql_column = column_mappings[field]
            query_conditions.append(f"{sql_column} LIKE ?")
            query_params.append(f"%{value}%")

    # Special handling for fields like "Content" or "Date" can be added here

    if not query_conditions:
        sql_query = "SELECT * FROM mail_index"
    else:
        sql_query += " AND ".join(query_conditions)

    return sql_query, query_params

Handle the search based on the search form criteria.

Parameters:

Name Type Description Default
event GenericEventArguments

Event arguments (unused in this method).

required
Source code in thunderbird/search.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
async def on_search(self, _event: GenericEventArguments):
    """
    Handle the search based on the search form criteria.

    Args:
        event (GenericEventArguments): Event arguments (unused in this method).
    """
    try:
        search_criteria = self.dict_edit.d
        sql_query, query_params = self.construct_query(search_criteria)
        search_results = self.tb.index_db.query(sql_query, query_params)
        result_count = len(search_results)
        msg = f"{result_count} messages found"
        if result_count > self.result_limit:
            msg = f"too many results: {result_count}>{self.result_limit}"
        self.search_summary.content = msg
        search_results = search_results[: self.result_limit]
        view_lod = ThunderbirdMailbox.to_view_lod(search_results, user=self.tb.user)

        self.search_results_grid.load_lod(view_lod)
        self.search_results_grid.update()
    except Exception as ex:
        self.webserver.handle_exception(ex)

setup_form(search_dict)

Set up a search form.

Parameters:

Name Type Description Default
search_dict dict

The dictionary containing search parameters.

required
Source code in thunderbird/search.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def setup_form(self, search_dict: dict):
    """
    Set up a search form.

    Args:
        search_dict (dict): The dictionary containing search parameters.
    """
    self.dict_edit = DictEdit(search_dict)
    self.dict_edit.expansion.open()
    self.search_button = (
        ui.button("search", icon="search", color="primary")
        .tooltip("search thunderbird mails")
        .on("click", handler=self.on_search)
    )
    self.search_summary = ui.html()
    self.search_results_grid = ListOfDictsGrid(lod=[])

tb_webserver

Created on 2023-11-23

@author: wf

ThunderbirdSolution

Bases: InputWebSolution

the Thunderbird Mail solution

Source code in thunderbird/tb_webserver.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
class ThunderbirdSolution(InputWebSolution):
    """
    the Thunderbird Mail solution
    """

    def __init__(self, webserver: ThunderbirdWebserver, client: Client):
        """
        Initialize the solution

        Calls the constructor of the base solution
        Args:
            webserver (DynamicCompotenceMapWebServer): The webserver instance associated with this context.
            client (Client): The client instance this context is associated with.
        """
        super().__init__(webserver, client)  # Call to the superclass constructor
        self.tb = None
        self.mail_archives=self.webserver.mail_archives

    async def show_mailboxes(self, user: str, profile_key: str):
        """
        Shows a mailboxes for a Thunderbird user profile

        Args:
            user (str): Username for identifying the user profile.
            profile_key (str): Thunderbird profile key.
        """

        def show_ui():
            try:
                if user not in self.mail_archives.mail_archives:
                    ui.html(f"Unknown user {user}")
                    return

                # Initialize the Thunderbird instance for the given user
                self.tb = Thunderbird.get(user)
                # get all mailboxes
                mboxes_view_lod = self.tb.get_synched_mailbox_view_lod()
                self.mboxes_view = ListOfDictsGrid(lod=mboxes_view_lod)
            except Exception as ex:
                self.handle_exception(ex)

        await self.setup_content_div(show_ui)

    async def show_search(self, user: str, profile_key: str):
        """
        Shows a search interface for Thunderbird mails.

        Args:
            user (str): Username for identifying the user profile.
            profile_key (str): Thunderbird profile key.
        """

        def show_ui():
            if user not in self.mail_archives.mail_archives:
                ui.html(f"Unknown user {user}")
                return

            # Initialize the Thunderbird instance for the given user
            self.tb = Thunderbird.get(user)

            # Define a search dictionary with default values or criteria
            search_dict = {
                "Subject": "",
                "From": "",
                "To": "",
                "Message-ID:": "",
            }
            # Initialize MailSearch with the Thunderbird instance and the search dictionary
            self.mail_search = MailSearch(self, self.tb, search_dict)

        await self.setup_content_div(show_ui)

    def update_mailboxes_grid(self,update_lod):
        with self.mailboxes_grid_container:
            self.mailboxes_grid.load_lod(update_lod)
            self.mailboxes_grid.sizeColumnsToFit()

    def run_indexing(self, tb, progress_bar):
        """
        prepare and run indexing of mailboxes
        """
        def update_grid(mailbox,message_count:int):
            """
            """
            index=len(update_lod)+1
            if index==1:
                self.mailboxes_label.text=self.ixs.msg         

            mb_record=mailbox.as_view_record(index=index)
            mb_record["count"]=message_count
            update_lod.append(mb_record)
            self.update_mailboxes_grid(update_lod)

        try:
            update_lod = []
            tb.do_create_or_update_index(ixs=self.ixs,progress_bar=progress_bar,callback=update_grid)
        except Exception as ex:
            self.handle_exception(ex)

    def run_prepare_indexing(self):
        try:
            self.tb.prepare_mailboxes_for_indexing(ixs=self.ixs,progress_bar=self.progress_bar)
            update_lod=self.ixs.get_update_lod()
            self.update_mailboxes_grid(update_lod)
        except Exception as ex:
            self.handle_exception(ex)

    async def create_or_update_index(self, user: str, profile_key: str) -> None:
        """
        user interface to start create or updating index
        """
        async def on_prepare():
            """
            Handle the prepare button click
            """
            # force index db update time
            self.tb.index_db_update_time=None
            await run.io_bound(self.run_prepare_indexing)


        async def on_index():
            """
            Handle the reindex button click
            """
            self.ixs.force_create = self.force_create_checkbox.value
            self.ixs.needs_update=True

            await run.io_bound(self.run_indexing, self.tb, progress_bar=self.progress_bar)

        def show_ui():
            """
            show my user interface
            """
            if user not in self.mail_archives.mail_archives:
                ui.html(f"Unknown user {user}")
            else:
                self.user = user
                self.tb = self.mail_archives.mail_archives[user]
                self.ixs=self.tb.get_indexing_state()
                self.progress_bar = NiceguiProgressbar(
                    total=100, desc="updating index", unit="mailboxes"
                )
                with ui.row() as self.header_row:
                    self.state_label=ui.label(self.ixs.state_msg)
                    self.mailboxes_label=ui.label("")
                    self.force_create_checkbox = ui.checkbox("Force Create")
                    self.prepare_button = ui.button("Prepare", on_click=on_prepare)
                    self.reindex_button = ui.button("Reindex", on_click=on_index)
                with ui.row() as self.mailboxes_grid_container:
                    # Create an instance of ListOfDictsGrid to display mailboxes
                    self.mailboxes_grid = ListOfDictsGrid(lod=[])

        await self.setup_content_div(show_ui)  
        await run.io_bound(self.run_indexing, self.tb, progress_bar=self.progress_bar)


    async def show_folders(self, user: str, profile_key: str) -> None:
        """
        Asynchronously shows a user's folder contents based on a profile key.

        Args:
            user (str): Username for identifying the user profile.
            profile_key (str): Thunderbird profile key

        Returns:
            None: The function is intended for display purposes only.
        """

        def show_ui():
            if user not in self.mail_archives.mail_archives:
                ui.html(f"Unknown user {user}")
            else:
                self.user = user
                self.tb = self.mail_archives.mail_archives[user]
                extensions = {"Folder": ".sbd", "Mailbox": ""}
                self.folder_selector = FileSelector(
                    path=self.tb.local_folders,
                    extensions=extensions,
                    handler=self.on_select_folder,
                )

        await self.setup_content_div(show_ui)

    async def on_select_folder(self, folder_path):
        """
        open the folder
        """
        relative_path = ThunderbirdMailbox.as_relative_path(folder_path)
        url = f"/folder/{self.user}{relative_path}"
        return ui.open(url, new_tab=True)

    async def show_folder(self, user, folder_path: str):
        """
        show the folder with the given path
        """

        def show_index():
            try:
                index_lod = self.folder_mbox.get_toc_lod_from_sqldb(self.tb.index_db)
                view_lod = ThunderbirdMailbox.to_view_lod(index_lod, user)
                msg_count = self.folder_mbox.mbox.__len__()
                with self.folder_view:
                    self.folder_view.content = f"{msg_count:5} ({folder_path})"
                    self.folder_grid.load_lod(lod=view_lod)
                    self.folder_grid.sizeColumnsToFit()
            except Exception as ex:
                self.handle_exception(ex)

        def show():
            self.tb = Thunderbird.get(user)
            self.folder_mbox = ThunderbirdMailbox(self.tb, folder_path, use_relative_path=True)
            self.folder_view = ui.html()
            self.folder_view.content = f"Loading {self.folder_mbox.relative_folder_path} ..."
            grid_config = GridConfig(key_col="email_index")
            self.folder_grid = ListOfDictsGrid(config=grid_config)
            self.folder_grid.html_columns = [1, 2]

        await self.setup_content_div(show)
        await run.io_bound(show_index)


    async def showMail(self, user: str, mailid: str):
        """
        Show the given mail of the given user.
        """

        def get_mail():
            """
            Get the mail.
            """
            try:
                mail = self.webserver.get_mail(user, mailid)
                # check mail has a message
                if not mail.msg:
                    title_section = self.sections["title"]
                    html_markup = mail.as_html_error_msg()
                    title_section.content_div.content = html_markup
                else:
                    for section_name, section in self.sections.items():
                        html_markup = mail.as_html_section(section_name)
                        with section.content_div:
                            section.content_div.content = html_markup
                            section.update()
            except Exception as ex:
                self.handle_exception(ex)

        async def show():
            try:
                self.sections = {}
                section_names = [
                    "title",
                    "info",
                    "wiki",
                    "headers",
                    "text",
                    "html",
                    "parts",
                ]
                visible_sections = {
                    "title",
                    "info",
                    "text",
                    "html",
                }  # Sections to be initially visible
                self.progress_bar = NiceguiProgressbar(100, "load mail", "steps")
                if user in self.mail_archives.mail_archives:
                    for section_name in section_names:
                        # Set initial visibility based on the section name
                        show_content = section_name in visible_sections
                        self.sections[section_name] = HideShow(
                            (section_name, section_name),
                            show_content=show_content,
                            lazy_init=True,
                        )
                    for section_name in section_names:
                        self.sections[section_name].set_content(ui.html())

                else:
                    self.mail_view = ui.html(f"unknown user {user}")
            except Exception as ex:
                self.handle_exception(ex)

        await self.setup_content_div(show)
        await run.io_bound(get_mail)  

    def setup_content(self):
        """
        select users
        """
        self.view_lod = self.mail_archives.as_view_lod()
        self.lod_grid = ListOfDictsGrid(lod=self.view_lod)

    async def home(self):
        """
        provide the main content page

        """
        await self.setup_content_div(self.setup_content)

__init__(webserver, client)

Initialize the solution

Calls the constructor of the base solution Args: webserver (DynamicCompotenceMapWebServer): The webserver instance associated with this context. client (Client): The client instance this context is associated with.

Source code in thunderbird/tb_webserver.py
172
173
174
175
176
177
178
179
180
181
182
183
def __init__(self, webserver: ThunderbirdWebserver, client: Client):
    """
    Initialize the solution

    Calls the constructor of the base solution
    Args:
        webserver (DynamicCompotenceMapWebServer): The webserver instance associated with this context.
        client (Client): The client instance this context is associated with.
    """
    super().__init__(webserver, client)  # Call to the superclass constructor
    self.tb = None
    self.mail_archives=self.webserver.mail_archives

create_or_update_index(user, profile_key) async

user interface to start create or updating index

Source code in thunderbird/tb_webserver.py
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
async def create_or_update_index(self, user: str, profile_key: str) -> None:
    """
    user interface to start create or updating index
    """
    async def on_prepare():
        """
        Handle the prepare button click
        """
        # force index db update time
        self.tb.index_db_update_time=None
        await run.io_bound(self.run_prepare_indexing)


    async def on_index():
        """
        Handle the reindex button click
        """
        self.ixs.force_create = self.force_create_checkbox.value
        self.ixs.needs_update=True

        await run.io_bound(self.run_indexing, self.tb, progress_bar=self.progress_bar)

    def show_ui():
        """
        show my user interface
        """
        if user not in self.mail_archives.mail_archives:
            ui.html(f"Unknown user {user}")
        else:
            self.user = user
            self.tb = self.mail_archives.mail_archives[user]
            self.ixs=self.tb.get_indexing_state()
            self.progress_bar = NiceguiProgressbar(
                total=100, desc="updating index", unit="mailboxes"
            )
            with ui.row() as self.header_row:
                self.state_label=ui.label(self.ixs.state_msg)
                self.mailboxes_label=ui.label("")
                self.force_create_checkbox = ui.checkbox("Force Create")
                self.prepare_button = ui.button("Prepare", on_click=on_prepare)
                self.reindex_button = ui.button("Reindex", on_click=on_index)
            with ui.row() as self.mailboxes_grid_container:
                # Create an instance of ListOfDictsGrid to display mailboxes
                self.mailboxes_grid = ListOfDictsGrid(lod=[])

    await self.setup_content_div(show_ui)  
    await run.io_bound(self.run_indexing, self.tb, progress_bar=self.progress_bar)

home() async

provide the main content page

Source code in thunderbird/tb_webserver.py
459
460
461
462
463
464
async def home(self):
    """
    provide the main content page

    """
    await self.setup_content_div(self.setup_content)

on_select_folder(folder_path) async

open the folder

Source code in thunderbird/tb_webserver.py
350
351
352
353
354
355
356
async def on_select_folder(self, folder_path):
    """
    open the folder
    """
    relative_path = ThunderbirdMailbox.as_relative_path(folder_path)
    url = f"/folder/{self.user}{relative_path}"
    return ui.open(url, new_tab=True)

run_indexing(tb, progress_bar)

prepare and run indexing of mailboxes

Source code in thunderbird/tb_webserver.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
def run_indexing(self, tb, progress_bar):
    """
    prepare and run indexing of mailboxes
    """
    def update_grid(mailbox,message_count:int):
        """
        """
        index=len(update_lod)+1
        if index==1:
            self.mailboxes_label.text=self.ixs.msg         

        mb_record=mailbox.as_view_record(index=index)
        mb_record["count"]=message_count
        update_lod.append(mb_record)
        self.update_mailboxes_grid(update_lod)

    try:
        update_lod = []
        tb.do_create_or_update_index(ixs=self.ixs,progress_bar=progress_bar,callback=update_grid)
    except Exception as ex:
        self.handle_exception(ex)

setup_content()

select users

Source code in thunderbird/tb_webserver.py
452
453
454
455
456
457
def setup_content(self):
    """
    select users
    """
    self.view_lod = self.mail_archives.as_view_lod()
    self.lod_grid = ListOfDictsGrid(lod=self.view_lod)

showMail(user, mailid) async

Show the given mail of the given user.

Source code in thunderbird/tb_webserver.py
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
async def showMail(self, user: str, mailid: str):
    """
    Show the given mail of the given user.
    """

    def get_mail():
        """
        Get the mail.
        """
        try:
            mail = self.webserver.get_mail(user, mailid)
            # check mail has a message
            if not mail.msg:
                title_section = self.sections["title"]
                html_markup = mail.as_html_error_msg()
                title_section.content_div.content = html_markup
            else:
                for section_name, section in self.sections.items():
                    html_markup = mail.as_html_section(section_name)
                    with section.content_div:
                        section.content_div.content = html_markup
                        section.update()
        except Exception as ex:
            self.handle_exception(ex)

    async def show():
        try:
            self.sections = {}
            section_names = [
                "title",
                "info",
                "wiki",
                "headers",
                "text",
                "html",
                "parts",
            ]
            visible_sections = {
                "title",
                "info",
                "text",
                "html",
            }  # Sections to be initially visible
            self.progress_bar = NiceguiProgressbar(100, "load mail", "steps")
            if user in self.mail_archives.mail_archives:
                for section_name in section_names:
                    # Set initial visibility based on the section name
                    show_content = section_name in visible_sections
                    self.sections[section_name] = HideShow(
                        (section_name, section_name),
                        show_content=show_content,
                        lazy_init=True,
                    )
                for section_name in section_names:
                    self.sections[section_name].set_content(ui.html())

            else:
                self.mail_view = ui.html(f"unknown user {user}")
        except Exception as ex:
            self.handle_exception(ex)

    await self.setup_content_div(show)
    await run.io_bound(get_mail)  

show_folder(user, folder_path) async

show the folder with the given path

Source code in thunderbird/tb_webserver.py
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
async def show_folder(self, user, folder_path: str):
    """
    show the folder with the given path
    """

    def show_index():
        try:
            index_lod = self.folder_mbox.get_toc_lod_from_sqldb(self.tb.index_db)
            view_lod = ThunderbirdMailbox.to_view_lod(index_lod, user)
            msg_count = self.folder_mbox.mbox.__len__()
            with self.folder_view:
                self.folder_view.content = f"{msg_count:5} ({folder_path})"
                self.folder_grid.load_lod(lod=view_lod)
                self.folder_grid.sizeColumnsToFit()
        except Exception as ex:
            self.handle_exception(ex)

    def show():
        self.tb = Thunderbird.get(user)
        self.folder_mbox = ThunderbirdMailbox(self.tb, folder_path, use_relative_path=True)
        self.folder_view = ui.html()
        self.folder_view.content = f"Loading {self.folder_mbox.relative_folder_path} ..."
        grid_config = GridConfig(key_col="email_index")
        self.folder_grid = ListOfDictsGrid(config=grid_config)
        self.folder_grid.html_columns = [1, 2]

    await self.setup_content_div(show)
    await run.io_bound(show_index)

show_folders(user, profile_key) async

Asynchronously shows a user's folder contents based on a profile key.

Parameters:

Name Type Description Default
user str

Username for identifying the user profile.

required
profile_key str

Thunderbird profile key

required

Returns:

Name Type Description
None None

The function is intended for display purposes only.

Source code in thunderbird/tb_webserver.py
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
async def show_folders(self, user: str, profile_key: str) -> None:
    """
    Asynchronously shows a user's folder contents based on a profile key.

    Args:
        user (str): Username for identifying the user profile.
        profile_key (str): Thunderbird profile key

    Returns:
        None: The function is intended for display purposes only.
    """

    def show_ui():
        if user not in self.mail_archives.mail_archives:
            ui.html(f"Unknown user {user}")
        else:
            self.user = user
            self.tb = self.mail_archives.mail_archives[user]
            extensions = {"Folder": ".sbd", "Mailbox": ""}
            self.folder_selector = FileSelector(
                path=self.tb.local_folders,
                extensions=extensions,
                handler=self.on_select_folder,
            )

    await self.setup_content_div(show_ui)

show_mailboxes(user, profile_key) async

Shows a mailboxes for a Thunderbird user profile

Parameters:

Name Type Description Default
user str

Username for identifying the user profile.

required
profile_key str

Thunderbird profile key.

required
Source code in thunderbird/tb_webserver.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
async def show_mailboxes(self, user: str, profile_key: str):
    """
    Shows a mailboxes for a Thunderbird user profile

    Args:
        user (str): Username for identifying the user profile.
        profile_key (str): Thunderbird profile key.
    """

    def show_ui():
        try:
            if user not in self.mail_archives.mail_archives:
                ui.html(f"Unknown user {user}")
                return

            # Initialize the Thunderbird instance for the given user
            self.tb = Thunderbird.get(user)
            # get all mailboxes
            mboxes_view_lod = self.tb.get_synched_mailbox_view_lod()
            self.mboxes_view = ListOfDictsGrid(lod=mboxes_view_lod)
        except Exception as ex:
            self.handle_exception(ex)

    await self.setup_content_div(show_ui)

Shows a search interface for Thunderbird mails.

Parameters:

Name Type Description Default
user str

Username for identifying the user profile.

required
profile_key str

Thunderbird profile key.

required
Source code in thunderbird/tb_webserver.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
async def show_search(self, user: str, profile_key: str):
    """
    Shows a search interface for Thunderbird mails.

    Args:
        user (str): Username for identifying the user profile.
        profile_key (str): Thunderbird profile key.
    """

    def show_ui():
        if user not in self.mail_archives.mail_archives:
            ui.html(f"Unknown user {user}")
            return

        # Initialize the Thunderbird instance for the given user
        self.tb = Thunderbird.get(user)

        # Define a search dictionary with default values or criteria
        search_dict = {
            "Subject": "",
            "From": "",
            "To": "",
            "Message-ID:": "",
        }
        # Initialize MailSearch with the Thunderbird instance and the search dictionary
        self.mail_search = MailSearch(self, self.tb, search_dict)

    await self.setup_content_div(show_ui)

ThunderbirdWebserver

Bases: InputWebserver

webserver for Thunderbird mail access via python

Source code in thunderbird/tb_webserver.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
class ThunderbirdWebserver(InputWebserver):
    """
    webserver for Thunderbird mail access via python
    """

    @classmethod
    def get_config(cls) -> WebserverConfig:
        """
        get the configuration for this Webserver
        """
        copy_right = "(c)2020-2024 Wolfgang Fahl"
        config = WebserverConfig(
            copy_right=copy_right, 
            version=Version(), 
            short_name="tbmail",
            default_port=8482,
            timeout=15.0
        )
        server_config = WebserverConfig.get(config)
        server_config.solution_class = ThunderbirdSolution
        return server_config


    def __init__(self):
        """Constructor"""
        InputWebserver.__init__(self, config=ThunderbirdWebserver.get_config())

        @app.get("/part/{user}/{mailid}/{part_index:int}")
        async def get_part(user: str, mailid: str, part_index: int):
            return await self.get_part(
                user, mailid, part_index
            )

        @app.get("/mail/{user}/{mailid}.wiki")
        def get_mail_wikimarkup(user: str, mailid: str):
            mail = self.get_mail(user, mailid)
            if (
                not mail.msg
            ):  # Assuming mail objects have a 'msg' attribute to check if the message exists
                html_markup = mail.as_html_error_msg()

                raise HTTPException(status_code=404, detail=html_markup)
            response = Response(content=mail.asWikiMarkup(), media_type="text/plain")
            return response

        @ui.page("/mail/{user}/{mailid}")
        async def showMail(client: Client,user: str, mailid: str):
            return await self.page(
                client,ThunderbirdSolution.showMail,
                user, mailid
            )

        @ui.page("/folder/{user}/{folder_path:path}")
        async def showFolder(client: Client,user: str, folder_path: str):
            return await self.page(
                client,ThunderbirdSolution.show_folder,
                user, folder_path
            )

        @ui.page("/profile/{user}/{profile_key}/mailboxes")
        async def show_mailboxes(client: Client,user: str, profile_key: str):
            return await self.page(
                client,ThunderbirdSolution.show_mailboxes,
                user, profile_key
            )

        @ui.page("/profile/{user}/{profile_key}/search")
        async def show_search(client: Client,user: str, profile_key: str):
            return await self.page(
                client,ThunderbirdSolution.show_search,
                user, profile_key
            )

        @ui.page("/profile/{user}/{profile_key}/index")
        async def create_or_update_index(client: Client,user: str, profile_key: str):
            return await self.page(
                client,ThunderbirdSolution.create_or_update_index,
                user, profile_key
            )

        @ui.page("/profile/{user}/{profile_key}")
        async def show_folders(client: Client,user: str, profile_key: str):
            return await self.page(
                client,ThunderbirdSolution.show_folders,
                user, profile_key
            )

    def configure_run(self):
        """
        configure me e.g. the allowed urls
        """
        InputWebserver.configure_run(self)
        # If args.user_list is None or empty, use users from the profiles
        # see https://github.com/WolfgangFahl/pyThunderbird/issues/19
        if not self.args.user_list:
            profile_map = Thunderbird.getProfileMap()
            user_list = list(profile_map.keys())
        else:
            user_list = self.args.user_list

        self.mail_archives = MailArchives(user_list)

    def get_mail(self, user: str, mailid: str) -> Any:
        """
        Retrieves a specific mail for a given user by its mail identifier.

        Args:
            user (str): The username of the individual whose mail is to be retrieved.
            mailid (str): The unique identifier for the mail to be retrieved.

        Returns:
            Any: Returns an instance of the Mail class corresponding to the specified `mailid` for the `user`.

        Raises:
            HTTPException: If the user is not found in the mail archives, an HTTP exception with status code 404 is raised.
        """
        if user not in self.mail_archives.mail_archives:
            raise HTTPException(status_code=404, detail=f"User '{user}' not found")
        tb = self.mail_archives.mail_archives[user]
        mail = Mail(user=user, mailid=mailid, tb=tb, debug=self.debug)
        return mail

    async def get_part(self, user: str, mailid: str, part_index: int) -> FileResponse:
        """
        Asynchronously retrieves a specific part of a mail for a given user, identified by the mail's unique ID and the part index.

        Args:
            user (str): The username of the individual whose mail part is to be retrieved.
            mailid (str): The unique identifier for the mail whose part is to be retrieved.
            part_index (int): The index of the part within the mail to retrieve.

        Returns:
            FileResponse: A file response object containing the specified part of the mail.

        Raises:
            HTTPException: If the user or the specified mail part does not exist, an HTTP exception could be raised.

       """      
        tb = self.mail_archives.mail_archives[user]
        mail = Mail(user=user, mailid=mailid, tb=tb, debug=self.debug)
        response = mail.part_as_fileresponse(part_index)
        return response

__init__()

Constructor

Source code in thunderbird/tb_webserver.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def __init__(self):
    """Constructor"""
    InputWebserver.__init__(self, config=ThunderbirdWebserver.get_config())

    @app.get("/part/{user}/{mailid}/{part_index:int}")
    async def get_part(user: str, mailid: str, part_index: int):
        return await self.get_part(
            user, mailid, part_index
        )

    @app.get("/mail/{user}/{mailid}.wiki")
    def get_mail_wikimarkup(user: str, mailid: str):
        mail = self.get_mail(user, mailid)
        if (
            not mail.msg
        ):  # Assuming mail objects have a 'msg' attribute to check if the message exists
            html_markup = mail.as_html_error_msg()

            raise HTTPException(status_code=404, detail=html_markup)
        response = Response(content=mail.asWikiMarkup(), media_type="text/plain")
        return response

    @ui.page("/mail/{user}/{mailid}")
    async def showMail(client: Client,user: str, mailid: str):
        return await self.page(
            client,ThunderbirdSolution.showMail,
            user, mailid
        )

    @ui.page("/folder/{user}/{folder_path:path}")
    async def showFolder(client: Client,user: str, folder_path: str):
        return await self.page(
            client,ThunderbirdSolution.show_folder,
            user, folder_path
        )

    @ui.page("/profile/{user}/{profile_key}/mailboxes")
    async def show_mailboxes(client: Client,user: str, profile_key: str):
        return await self.page(
            client,ThunderbirdSolution.show_mailboxes,
            user, profile_key
        )

    @ui.page("/profile/{user}/{profile_key}/search")
    async def show_search(client: Client,user: str, profile_key: str):
        return await self.page(
            client,ThunderbirdSolution.show_search,
            user, profile_key
        )

    @ui.page("/profile/{user}/{profile_key}/index")
    async def create_or_update_index(client: Client,user: str, profile_key: str):
        return await self.page(
            client,ThunderbirdSolution.create_or_update_index,
            user, profile_key
        )

    @ui.page("/profile/{user}/{profile_key}")
    async def show_folders(client: Client,user: str, profile_key: str):
        return await self.page(
            client,ThunderbirdSolution.show_folders,
            user, profile_key
        )

configure_run()

configure me e.g. the allowed urls

Source code in thunderbird/tb_webserver.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def configure_run(self):
    """
    configure me e.g. the allowed urls
    """
    InputWebserver.configure_run(self)
    # If args.user_list is None or empty, use users from the profiles
    # see https://github.com/WolfgangFahl/pyThunderbird/issues/19
    if not self.args.user_list:
        profile_map = Thunderbird.getProfileMap()
        user_list = list(profile_map.keys())
    else:
        user_list = self.args.user_list

    self.mail_archives = MailArchives(user_list)

get_config() classmethod

get the configuration for this Webserver

Source code in thunderbird/tb_webserver.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@classmethod
def get_config(cls) -> WebserverConfig:
    """
    get the configuration for this Webserver
    """
    copy_right = "(c)2020-2024 Wolfgang Fahl"
    config = WebserverConfig(
        copy_right=copy_right, 
        version=Version(), 
        short_name="tbmail",
        default_port=8482,
        timeout=15.0
    )
    server_config = WebserverConfig.get(config)
    server_config.solution_class = ThunderbirdSolution
    return server_config

get_mail(user, mailid)

Retrieves a specific mail for a given user by its mail identifier.

Parameters:

Name Type Description Default
user str

The username of the individual whose mail is to be retrieved.

required
mailid str

The unique identifier for the mail to be retrieved.

required

Returns:

Name Type Description
Any Any

Returns an instance of the Mail class corresponding to the specified mailid for the user.

Raises:

Type Description
HTTPException

If the user is not found in the mail archives, an HTTP exception with status code 404 is raised.

Source code in thunderbird/tb_webserver.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def get_mail(self, user: str, mailid: str) -> Any:
    """
    Retrieves a specific mail for a given user by its mail identifier.

    Args:
        user (str): The username of the individual whose mail is to be retrieved.
        mailid (str): The unique identifier for the mail to be retrieved.

    Returns:
        Any: Returns an instance of the Mail class corresponding to the specified `mailid` for the `user`.

    Raises:
        HTTPException: If the user is not found in the mail archives, an HTTP exception with status code 404 is raised.
    """
    if user not in self.mail_archives.mail_archives:
        raise HTTPException(status_code=404, detail=f"User '{user}' not found")
    tb = self.mail_archives.mail_archives[user]
    mail = Mail(user=user, mailid=mailid, tb=tb, debug=self.debug)
    return mail

get_part(user, mailid, part_index) async

Asynchronously retrieves a specific part of a mail for a given user, identified by the mail's unique ID and the part index.

Parameters:

Name Type Description Default
user str

The username of the individual whose mail part is to be retrieved.

required
mailid str

The unique identifier for the mail whose part is to be retrieved.

required
part_index int

The index of the part within the mail to retrieve.

required

Returns:

Name Type Description
FileResponse FileResponse

A file response object containing the specified part of the mail.

Raises:

Type Description
HTTPException

If the user or the specified mail part does not exist, an HTTP exception could be raised.

Source code in thunderbird/tb_webserver.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
async def get_part(self, user: str, mailid: str, part_index: int) -> FileResponse:
    """
    Asynchronously retrieves a specific part of a mail for a given user, identified by the mail's unique ID and the part index.

    Args:
        user (str): The username of the individual whose mail part is to be retrieved.
        mailid (str): The unique identifier for the mail whose part is to be retrieved.
        part_index (int): The index of the part within the mail to retrieve.

    Returns:
        FileResponse: A file response object containing the specified part of the mail.

    Raises:
        HTTPException: If the user or the specified mail part does not exist, an HTTP exception could be raised.

   """      
    tb = self.mail_archives.mail_archives[user]
    mail = Mail(user=user, mailid=mailid, tb=tb, debug=self.debug)
    response = mail.part_as_fileresponse(part_index)
    return response

version

Created on 2023-11-23

@author: wf

Version dataclass

Bases: object

Version handling for pyThunderbird

Source code in thunderbird/version.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@dataclass
class Version(object):
    """
    Version handling for pyThunderbird
    """

    name = "pyThunderbird"
    version = thunderbird.__version__
    description = "python based access to Thunderbird mail"
    date = "2021-09-23"
    updated = "2024-08-01"

    authors = "Wolfgang Fahl"

    doc_url = "https://wiki.bitplan.com/index.php/pyThunderbird"
    chat_url = "https://github.com/WolfgangFahl/pyThunderbird/discussions"
    cm_url = "https://github.com/WolfgangFahl/pyThunderbird"

    license = f"""Copyright 2020-2023 contributors. All rights reserved.

  Licensed under the Apache License 2.0
  http://www.apache.org/licenses/LICENSE-2.0

  Distributed on an "AS IS" basis without warranties
  or conditions of any kind, either express or implied."""

    longDescription = f"""{name} version {version}
{description}

  Created by {authors} on {date} last updated {updated}"""