Skip to content

pynomaina API Documentation

date_utils

Created on 2024-10-02

@author: wf

DateUtils

date utilities

Source code in nomina/date_utils.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
class DateUtils:
    """
    date utilities
    """

    @classmethod
    def parse_date(
        cls, date_str: str, date_formats: Optional[List[str]] = None
    ) -> Optional[str]:
        """
        Parse the given date string using the provided formats.

        Args:
            date_str (str): The date string to parse.
            date_formats (List[str], optional): List of date formats to try.
                If None, uses a default list of formats.

        Returns:
            Optional[str]: The parsed date in ISO format (YYYY-MM-DD) or None if parsing fails.
        """
        if date_formats is None:
            date_formats = [
                "%m.%d.%y",
                "%d.%m.%y",
                "%m/%d/%y",
                "%d/%m/%y",
                "%Y-%m-%d",
                "%Y/%m/%d",
                "%Y-%m-%d %H:%M:%S %z",  # Added to handle the GnuCash XML format
            ]

        for date_format in date_formats:
            try:
                date_obj = datetime.strptime(date_str, date_format)
                return date_obj.strftime("%Y-%m-%d")
            except ValueError:
                continue

        return None

    @classmethod
    def split_date_range(
        cls, start_date: str, end_date: str, num_ranges: int
    ) -> List[Tuple[str, str]]:
        """
        Splits a date range into a predefined number of sub-ranges.

        Args:
            start_date (str): The start date in "YYYY-MM-DD" format.
            end_date (str): The end date in "YYYY-MM-DD" format.
            num_ranges (int): The number of ranges to split into.

        Returns:
            List[Tuple[str, str]]: A list of tuples representing the sub-ranges.
        """
        start = datetime.strptime(start_date, "%Y-%m-%d")
        end = datetime.strptime(end_date, "%Y-%m-%d")
        total_days = (end - start).days

        base_range_length = total_days // num_ranges
        extra_days = total_days % num_ranges

        ranges = []
        current_start = start
        for i in range(num_ranges):
            range_length = base_range_length + (1 if i < extra_days else 0)
            current_end = current_start + timedelta(days=range_length - 1)

            ranges.append(
                (current_start.strftime("%Y-%m-%d"), current_end.strftime("%Y-%m-%d"))
            )
            current_start = current_end + timedelta(days=1)

        # Ensure the last range ends on the specified end date
        ranges[-1] = (ranges[-1][0], end_date)

        return ranges

parse_date(date_str, date_formats=None) classmethod

Parse the given date string using the provided formats.

Parameters:

Name Type Description Default
date_str str

The date string to parse.

required
date_formats List[str]

List of date formats to try. If None, uses a default list of formats.

None

Returns:

Type Description
Optional[str]

Optional[str]: The parsed date in ISO format (YYYY-MM-DD) or None if parsing fails.

Source code in nomina/date_utils.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@classmethod
def parse_date(
    cls, date_str: str, date_formats: Optional[List[str]] = None
) -> Optional[str]:
    """
    Parse the given date string using the provided formats.

    Args:
        date_str (str): The date string to parse.
        date_formats (List[str], optional): List of date formats to try.
            If None, uses a default list of formats.

    Returns:
        Optional[str]: The parsed date in ISO format (YYYY-MM-DD) or None if parsing fails.
    """
    if date_formats is None:
        date_formats = [
            "%m.%d.%y",
            "%d.%m.%y",
            "%m/%d/%y",
            "%d/%m/%y",
            "%Y-%m-%d",
            "%Y/%m/%d",
            "%Y-%m-%d %H:%M:%S %z",  # Added to handle the GnuCash XML format
        ]

    for date_format in date_formats:
        try:
            date_obj = datetime.strptime(date_str, date_format)
            return date_obj.strftime("%Y-%m-%d")
        except ValueError:
            continue

    return None

split_date_range(start_date, end_date, num_ranges) classmethod

Splits a date range into a predefined number of sub-ranges.

Parameters:

Name Type Description Default
start_date str

The start date in "YYYY-MM-DD" format.

required
end_date str

The end date in "YYYY-MM-DD" format.

required
num_ranges int

The number of ranges to split into.

required

Returns:

Type Description
List[Tuple[str, str]]

List[Tuple[str, str]]: A list of tuples representing the sub-ranges.

Source code in nomina/date_utils.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
@classmethod
def split_date_range(
    cls, start_date: str, end_date: str, num_ranges: int
) -> List[Tuple[str, str]]:
    """
    Splits a date range into a predefined number of sub-ranges.

    Args:
        start_date (str): The start date in "YYYY-MM-DD" format.
        end_date (str): The end date in "YYYY-MM-DD" format.
        num_ranges (int): The number of ranges to split into.

    Returns:
        List[Tuple[str, str]]: A list of tuples representing the sub-ranges.
    """
    start = datetime.strptime(start_date, "%Y-%m-%d")
    end = datetime.strptime(end_date, "%Y-%m-%d")
    total_days = (end - start).days

    base_range_length = total_days // num_ranges
    extra_days = total_days % num_ranges

    ranges = []
    current_start = start
    for i in range(num_ranges):
        range_length = base_range_length + (1 if i < extra_days else 0)
        current_end = current_start + timedelta(days=range_length - 1)

        ranges.append(
            (current_start.strftime("%Y-%m-%d"), current_end.strftime("%Y-%m-%d"))
        )
        current_start = current_end + timedelta(days=1)

    # Ensure the last range ends on the specified end date
    ranges[-1] = (ranges[-1][0], end_date)

    return ranges

gnucash

Created on 2024-10-02

@author: wf

GnuCashXml

GnuCash XML reader/writer

Source code in nomina/gnucash.py
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
class GnuCashXml:
    """
    GnuCash XML reader/writer
    """

    def __init__(self, indent: str = "  "):
        """
        constructor
        using two space indentation
        """
        self.indent = indent
        self.namespaces = {
            "gnc": "http://www.gnucash.org/XML/gnc",
            "act": "http://www.gnucash.org/XML/act",
            "book": "http://www.gnucash.org/XML/book",
            "cd": "http://www.gnucash.org/XML/cd",
            "cmdty": "http://www.gnucash.org/XML/cmdty",
            "price": "http://www.gnucash.org/XML/price",
            "slot": "http://www.gnucash.org/XML/slot",
            "split": "http://www.gnucash.org/XML/split",
            "sx": "http://www.gnucash.org/XML/sx",
            "trn": "http://www.gnucash.org/XML/trn",
            "ts": "http://www.gnucash.org/XML/ts",
            "fs": "http://www.gnucash.org/XML/fs",
            "bgt": "http://www.gnucash.org/XML/bgt",
            "recurrence": "http://www.gnucash.org/XML/recurrence",
            "lot": "http://www.gnucash.org/XML/lot",
            "addr": "http://www.gnucash.org/XML/addr",
            "billterm": "http://www.gnucash.org/XML/billterm",
            "bt-days": "http://www.gnucash.org/XML/bt-days",
            "bt-prox": "http://www.gnucash.org/XML/bt-prox",
            "cust": "http://www.gnucash.org/XML/cust",
            "employee": "http://www.gnucash.org/XML/employee",
            "entry": "http://www.gnucash.org/XML/entry",
            "invoice": "http://www.gnucash.org/XML/invoice",
            "job": "http://www.gnucash.org/XML/job",
            "order": "http://www.gnucash.org/XML/order",
            "owner": "http://www.gnucash.org/XML/owner",
            "taxtable": "http://www.gnucash.org/XML/taxtable",
            "tte": "http://www.gnucash.org/XML/tte",
            "vendor": "http://www.gnucash.org/XML/vendor",
        }

    def parse_gnucash_xml(self, xml_file: str) -> GncV2:
        parser = XmlParser(config=ParserConfig(fail_on_unknown_properties=False))
        return parser.parse(xml_file, GncV2)

    def xml_format(self, xml_string: str) -> str:
        """
        adapt the format of the xml_string to gnu cash conventions
        """
        # unindent two spaces twice
        formatted_xml = re.sub(r"(\n  )", r"\n", xml_string)
        formatted_xml = re.sub(r"(\n  )", r"\n", formatted_xml)

        # formatting of xmlns attributes
        xmlns_indent = "     "  # Five spaces
        formatted_xml = re.sub(
            r'\s(xmlns:[^=]+="[^"]+")', f"\n{xmlns_indent}\\1", formatted_xml
        )

        # Ensure there's a space before the closing ?> in the XML declaration
        formatted_xml = formatted_xml.replace("?>", " ?>")

        # Consistent empty element formatting
        formatted_xml = re.sub(r"<([^>]+)\/>", r"<\1/>", formatted_xml)

        # add a new line at end
        formatted_xml += "\n"

        return formatted_xml

    def write_gnucash_xml(self, gnucash_data: GncV2, output_file: str) -> None:
        """
        Serialize the GnuCash data object to an XML file.

        Args:
            gnucash_data (GnuCashXml): The GnuCash data object to serialize.
            output_file (str): The file path where the XML will be written.
        """
        serializer = XmlSerializer(
            config=SerializerConfig(
                pretty_print=True,
                xml_declaration=True,
                encoding="utf-8",
                indent=self.indent,
            )
        )

        with io.StringIO() as xml_buffer:
            serializer.write(xml_buffer, gnucash_data, ns_map=self.namespaces)
            xml_string = xml_buffer.getvalue()

        # Apply the custom filter to the XML string
        formatted_xml_string = self.xml_format(xml_string)

        # Write the formatted XML string to the file
        with open(output_file, "w", encoding="UTF-8") as f:
            f.write(formatted_xml_string)

    def get_stats(self, gncv2: GncV2) -> Dict:
        dates = []
        parse_errors = 0
        for tx in gncv2.book.transactions:
            if tx.date_posted and tx.date_posted.date:
                parsed_date = DateUtils.parse_date(tx.date_posted.date)
                if parsed_date:
                    dates.append(datetime.strptime(parsed_date, "%Y-%m-%d"))
                else:
                    parse_errors += 1

        if dates:
            min_date = min(dates).strftime("%Y-%m-%d")
            max_date = max(dates).strftime("%Y-%m-%d")
        else:
            min_date = max_date = None

        return {
            "#transactions": len(gncv2.book.transactions),
            "#accounts": len(gncv2.book.accounts),
            "date_range": {"start_date": min_date, "end_date": max_date},
            "parse_errors": parse_errors,
        }

    def show_summary(self, gncv2: GncV2):
        stats = self.get_stats(gncv2)
        self.stats = stats
        print(f"#accounts: {stats['#accounts']}")
        print(f"#transactions: {stats['#transactions']}")
        print(
            f"Date range: {stats['date_range']['start_date']} to {stats['date_range']['end_date']}"
        )

__init__(indent=' ')

constructor using two space indentation

Source code in nomina/gnucash.py
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
def __init__(self, indent: str = "  "):
    """
    constructor
    using two space indentation
    """
    self.indent = indent
    self.namespaces = {
        "gnc": "http://www.gnucash.org/XML/gnc",
        "act": "http://www.gnucash.org/XML/act",
        "book": "http://www.gnucash.org/XML/book",
        "cd": "http://www.gnucash.org/XML/cd",
        "cmdty": "http://www.gnucash.org/XML/cmdty",
        "price": "http://www.gnucash.org/XML/price",
        "slot": "http://www.gnucash.org/XML/slot",
        "split": "http://www.gnucash.org/XML/split",
        "sx": "http://www.gnucash.org/XML/sx",
        "trn": "http://www.gnucash.org/XML/trn",
        "ts": "http://www.gnucash.org/XML/ts",
        "fs": "http://www.gnucash.org/XML/fs",
        "bgt": "http://www.gnucash.org/XML/bgt",
        "recurrence": "http://www.gnucash.org/XML/recurrence",
        "lot": "http://www.gnucash.org/XML/lot",
        "addr": "http://www.gnucash.org/XML/addr",
        "billterm": "http://www.gnucash.org/XML/billterm",
        "bt-days": "http://www.gnucash.org/XML/bt-days",
        "bt-prox": "http://www.gnucash.org/XML/bt-prox",
        "cust": "http://www.gnucash.org/XML/cust",
        "employee": "http://www.gnucash.org/XML/employee",
        "entry": "http://www.gnucash.org/XML/entry",
        "invoice": "http://www.gnucash.org/XML/invoice",
        "job": "http://www.gnucash.org/XML/job",
        "order": "http://www.gnucash.org/XML/order",
        "owner": "http://www.gnucash.org/XML/owner",
        "taxtable": "http://www.gnucash.org/XML/taxtable",
        "tte": "http://www.gnucash.org/XML/tte",
        "vendor": "http://www.gnucash.org/XML/vendor",
    }

write_gnucash_xml(gnucash_data, output_file)

Serialize the GnuCash data object to an XML file.

Parameters:

Name Type Description Default
gnucash_data GnuCashXml

The GnuCash data object to serialize.

required
output_file str

The file path where the XML will be written.

required
Source code in nomina/gnucash.py
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
def write_gnucash_xml(self, gnucash_data: GncV2, output_file: str) -> None:
    """
    Serialize the GnuCash data object to an XML file.

    Args:
        gnucash_data (GnuCashXml): The GnuCash data object to serialize.
        output_file (str): The file path where the XML will be written.
    """
    serializer = XmlSerializer(
        config=SerializerConfig(
            pretty_print=True,
            xml_declaration=True,
            encoding="utf-8",
            indent=self.indent,
        )
    )

    with io.StringIO() as xml_buffer:
        serializer.write(xml_buffer, gnucash_data, ns_map=self.namespaces)
        xml_string = xml_buffer.getvalue()

    # Apply the custom filter to the XML string
    formatted_xml_string = self.xml_format(xml_string)

    # Write the formatted XML string to the file
    with open(output_file, "w", encoding="UTF-8") as f:
        f.write(formatted_xml_string)

xml_format(xml_string)

adapt the format of the xml_string to gnu cash conventions

Source code in nomina/gnucash.py
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
def xml_format(self, xml_string: str) -> str:
    """
    adapt the format of the xml_string to gnu cash conventions
    """
    # unindent two spaces twice
    formatted_xml = re.sub(r"(\n  )", r"\n", xml_string)
    formatted_xml = re.sub(r"(\n  )", r"\n", formatted_xml)

    # formatting of xmlns attributes
    xmlns_indent = "     "  # Five spaces
    formatted_xml = re.sub(
        r'\s(xmlns:[^=]+="[^"]+")', f"\n{xmlns_indent}\\1", formatted_xml
    )

    # Ensure there's a space before the closing ?> in the XML declaration
    formatted_xml = formatted_xml.replace("?>", " ?>")

    # Consistent empty element formatting
    formatted_xml = re.sub(r"<([^>]+)\/>", r"<\1/>", formatted_xml)

    # add a new line at end
    formatted_xml += "\n"

    return formatted_xml

TsDate dataclass

Source code in nomina/gnucash.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@dataclass
class TsDate:
    date: Optional[str] = field(
        default=None,
        metadata={
            "type": "Element",
            "namespace": "http://www.gnucash.org/XML/ts",
        },
    )

    def __post_init__(self):
        """
        format date to 1970-01-01 00:00:00 +0000
        """
        if self.date and len(self.date)==10:
            self.date += " 00:00:00 +0000"

__post_init__()

format date to 1970-01-01 00:00:00 +0000

Source code in nomina/gnucash.py
64
65
66
67
68
69
def __post_init__(self):
    """
    format date to 1970-01-01 00:00:00 +0000
    """
    if self.date and len(self.date)==10:
        self.date += " 00:00:00 +0000"

ledger

Created on 04.10.2024

@author: wf

Account

Represents a ledger account.

Source code in nomina/ledger.py
15
16
17
18
19
20
21
22
23
24
25
26
@lod_storable
class Account:
    """
    Represents a ledger account.
    """

    account_id: str
    name: str
    account_type: str
    description: Optional[str] = ""
    currency: str = "EUR"  # Default to EUR
    parent_account_id: Optional[str] = None

Book

Represents a ledger book containing accounts and transactions.

Source code in nomina/ledger.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
@lod_storable
class Book:
    """
    Represents a ledger book containing accounts and transactions.
    """

    name: Optional[str] = None
    owner: Optional[str] = None
    since: Optional[str] = None
    url: Optional[str] = None
    accounts: Dict[str, Account] = field(default_factory=dict)
    transactions: Dict[str, Transaction] = field(default_factory=dict)

    def __post_init__(self):
        """
        post construct actions
        """

    def get_stats(self) -> Stats:
        """
        Get statistics about the Book.

        Returns:
            Stats: An object containing various statistics about the Book.
        """
        # Calculate date range
        dates = [
            datetime.strptime(tx.isodate.split()[0], "%Y-%m-%d")
            for tx in self.transactions.values()
            if tx.isodate
        ]
        if dates:
            min_date = min(dates).strftime("%Y-%m-%d")
            max_date = max(dates).strftime("%Y-%m-%d")
        else:
            min_date = max_date = None

        return Stats(
            accounts=len(self.accounts),
            transactions=len(self.transactions),
            start_date=min_date,
            end_date=max_date
        )


    def filter(self, start_date: str = None, end_date: str = None) -> "Book":
        """
        Filter the transactions based on the given date range.

        Args:
            start_date (str): The start date in 'YYYY-MM-DD' format.
            end_date (str): The end date in 'YYYY-MM-DD' format.

        Returns:
            Book: A new Book object with filtered transactions.
        """
        filtered_transactions = {}

        for transaction_id, transaction in self.transactions.items():
            transaction_date = transaction.isodate.split()[
                0
            ]  # Extract 'YYYY-MM-DD' part

            in_range = (not start_date or transaction_date >= start_date) and (
                not end_date or transaction_date <= end_date
            )

            if in_range:
                filtered_transactions[transaction_id] = transaction

        filtered_book = deepcopy(self)
        filtered_book.transactions = filtered_transactions
        return filtered_book

    def create_account(
        self,
        name: str,
        account_type: str = "EXPENSE",
        parent_account_id: Optional[str] = None,
    ) -> Account:
        """
        Create a ledger account with the given parameters.

        Args:
            name (str): The name of the account.
            account_type (str): The type of the account. Defaults to "EXPENSE".
            parent_account_id (Optional[str]): The id of the parent account, if any.

        Returns:
            Account: A new Account object.
        """
        # Calculate the account ID based on the parent's account ID (if any)
        if parent_account_id:
            parent_account = self.lookup_account(parent_account_id)
            if parent_account:
                account_id = f"{parent_account_id}:{name}"
            else:
                raise ValueError(f"invalid parent account {parent_account_id}")
        else:
            account_id = name  # top level account

        # Create the account
        account = Account(
            account_id=account_id,
            name=name,
            account_type=account_type,
            parent_account_id=parent_account_id,
        )
        self.add_account(account)

    def add_account(self, account: Account):
        """
        add the given account
        """
        self.accounts[account.account_id] = account
        return account

    def lookup_account(self, account_id: str) -> Optional[Account]:
        """
        Get the account for the given account id.

        Args:
            account_id(str): The id of the account to look up.

        Returns:
            Optional[Account]: The found account or None if not found.
        """
        return self.accounts.get(account_id)

__post_init__()

post construct actions

Source code in nomina/ledger.py
75
76
77
78
def __post_init__(self):
    """
    post construct actions
    """

add_account(account)

add the given account

Source code in nomina/ledger.py
172
173
174
175
176
177
def add_account(self, account: Account):
    """
    add the given account
    """
    self.accounts[account.account_id] = account
    return account

create_account(name, account_type='EXPENSE', parent_account_id=None)

Create a ledger account with the given parameters.

Parameters:

Name Type Description Default
name str

The name of the account.

required
account_type str

The type of the account. Defaults to "EXPENSE".

'EXPENSE'
parent_account_id Optional[str]

The id of the parent account, if any.

None

Returns:

Name Type Description
Account Account

A new Account object.

Source code in nomina/ledger.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def create_account(
    self,
    name: str,
    account_type: str = "EXPENSE",
    parent_account_id: Optional[str] = None,
) -> Account:
    """
    Create a ledger account with the given parameters.

    Args:
        name (str): The name of the account.
        account_type (str): The type of the account. Defaults to "EXPENSE".
        parent_account_id (Optional[str]): The id of the parent account, if any.

    Returns:
        Account: A new Account object.
    """
    # Calculate the account ID based on the parent's account ID (if any)
    if parent_account_id:
        parent_account = self.lookup_account(parent_account_id)
        if parent_account:
            account_id = f"{parent_account_id}:{name}"
        else:
            raise ValueError(f"invalid parent account {parent_account_id}")
    else:
        account_id = name  # top level account

    # Create the account
    account = Account(
        account_id=account_id,
        name=name,
        account_type=account_type,
        parent_account_id=parent_account_id,
    )
    self.add_account(account)

filter(start_date=None, end_date=None)

Filter the transactions based on the given date range.

Parameters:

Name Type Description Default
start_date str

The start date in 'YYYY-MM-DD' format.

None
end_date str

The end date in 'YYYY-MM-DD' format.

None

Returns:

Name Type Description
Book Book

A new Book object with filtered transactions.

Source code in nomina/ledger.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def filter(self, start_date: str = None, end_date: str = None) -> "Book":
    """
    Filter the transactions based on the given date range.

    Args:
        start_date (str): The start date in 'YYYY-MM-DD' format.
        end_date (str): The end date in 'YYYY-MM-DD' format.

    Returns:
        Book: A new Book object with filtered transactions.
    """
    filtered_transactions = {}

    for transaction_id, transaction in self.transactions.items():
        transaction_date = transaction.isodate.split()[
            0
        ]  # Extract 'YYYY-MM-DD' part

        in_range = (not start_date or transaction_date >= start_date) and (
            not end_date or transaction_date <= end_date
        )

        if in_range:
            filtered_transactions[transaction_id] = transaction

    filtered_book = deepcopy(self)
    filtered_book.transactions = filtered_transactions
    return filtered_book

get_stats()

Get statistics about the Book.

Returns:

Name Type Description
Stats Stats

An object containing various statistics about the Book.

Source code in nomina/ledger.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def get_stats(self) -> Stats:
    """
    Get statistics about the Book.

    Returns:
        Stats: An object containing various statistics about the Book.
    """
    # Calculate date range
    dates = [
        datetime.strptime(tx.isodate.split()[0], "%Y-%m-%d")
        for tx in self.transactions.values()
        if tx.isodate
    ]
    if dates:
        min_date = min(dates).strftime("%Y-%m-%d")
        max_date = max(dates).strftime("%Y-%m-%d")
    else:
        min_date = max_date = None

    return Stats(
        accounts=len(self.accounts),
        transactions=len(self.transactions),
        start_date=min_date,
        end_date=max_date
    )

lookup_account(account_id)

Get the account for the given account id.

Parameters:

Name Type Description Default
account_id(str)

The id of the account to look up.

required

Returns:

Type Description
Optional[Account]

Optional[Account]: The found account or None if not found.

Source code in nomina/ledger.py
179
180
181
182
183
184
185
186
187
188
189
def lookup_account(self, account_id: str) -> Optional[Account]:
    """
    Get the account for the given account id.

    Args:
        account_id(str): The id of the account to look up.

    Returns:
        Optional[Account]: The found account or None if not found.
    """
    return self.accounts.get(account_id)

Split

Represents a split in a transaction.

Source code in nomina/ledger.py
29
30
31
32
33
34
35
36
37
38
@lod_storable
class Split:
    """
    Represents a split in a transaction.
    """

    amount: float
    account_id: str
    memo: Optional[str] = ""
    reconciled: bool = False

Transaction

Represents a transaction in the ledger.

Source code in nomina/ledger.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@lod_storable
class Transaction:
    """
    Represents a transaction in the ledger.
    """

    isodate: str
    description: str
    splits: List[Split] = field(default_factory=list)
    payee: Optional[str] = None
    memo: Optional[str] = ""

    def total_amount(self) -> float:
        """
        Calculates the total amount of the transaction.
        Returns:
            float: The sum of all split amounts.
        """
        return sum(split.amount for split in self.splits)

total_amount()

Calculates the total amount of the transaction. Returns: float: The sum of all split amounts.

Source code in nomina/ledger.py
53
54
55
56
57
58
59
def total_amount(self) -> float:
    """
    Calculates the total amount of the transaction.
    Returns:
        float: The sum of all split amounts.
    """
    return sum(split.amount for split in self.splits)

nomina_cmd

Created on 2024-10-06

@author: wf

NominaCmd

Bases: WebserverCmd

command line handling for nomina

Source code in nomina/nomina_cmd.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class NominaCmd(WebserverCmd):
    """
    command line handling for nomina
    """

    def __init__(self):
        """
        constructor
        """
        config = NominaWebServer.get_config()
        WebserverCmd.__init__(self, config, NominaWebServer, DEBUG)
        pass

    def getArgParser(self, description: str, version_msg) -> ArgumentParser:
        """
        override the default argparser call
        """
        parser = super().getArgParser(description, version_msg)
        parser.add_argument(
            "-v",
            "--verbose",
            action="store_true",
            help="show verbose output [default: %(default)s]",
        )
        parser.add_argument(
            "-rp",
            "--root_path",
            default=NominaWebServer.examples_path(),
            help="path to nomina files [default: %(default)s]",
        )
        return parser

__init__()

constructor

Source code in nomina/nomina_cmd.py
20
21
22
23
24
25
26
def __init__(self):
    """
    constructor
    """
    config = NominaWebServer.get_config()
    WebserverCmd.__init__(self, config, NominaWebServer, DEBUG)
    pass

getArgParser(description, version_msg)

override the default argparser call

Source code in nomina/nomina_cmd.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def getArgParser(self, description: str, version_msg) -> ArgumentParser:
    """
    override the default argparser call
    """
    parser = super().getArgParser(description, version_msg)
    parser.add_argument(
        "-v",
        "--verbose",
        action="store_true",
        help="show verbose output [default: %(default)s]",
    )
    parser.add_argument(
        "-rp",
        "--root_path",
        default=NominaWebServer.examples_path(),
        help="path to nomina files [default: %(default)s]",
    )
    return parser

main(argv=None)

main call

Source code in nomina/nomina_cmd.py
48
49
50
51
52
53
54
def main(argv: list = None):
    """
    main call
    """
    cmd = NominaCmd()
    exit_code = cmd.cmd_main(argv)
    return exit_code

qif

Created on 2024-10-01

Quicken Interchange Format (QIF) Parser see https://en.wikipedia.org/wiki/Quicken_Interchange_Format

@author: wf

Category dataclass

Bases: ParseRecord

a QIF tag (Class or Category)

Source code in nomina/qif.py
71
72
73
74
75
76
77
78
@lod_storable
class Category(ParseRecord):
    """
    a QIF tag (Class or Category)
    """

    name: Optional[str] = None
    description: str = ""

QifClass dataclass

Bases: ParseRecord

a QIF tag (Class or Category)

Source code in nomina/qif.py
81
82
83
84
85
86
87
88
@lod_storable
class QifClass(ParseRecord):
    """
    a QIF tag (Class or Category)
    """

    name: Optional[str] = None
    description: str = ""

SimpleQifParser

a QIF parser

Source code in nomina/qif.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
@lod_storable
class SimpleQifParser:
    """
    a QIF parser
    """

    currency: str = "EUR"
    default_account_type = "EXPENSE"
    options: Dict[str, str] = field(default_factory=dict)
    classes: Dict[str, QifClass] = field(default_factory=dict)
    categories: Dict[str, Category] = field(default_factory=dict)
    accounts: Dict[str, Account] = field(default_factory=dict)
    transactions: Dict[str, Transaction] = field(default_factory=dict)
    accounts: Dict[str, Account] = field(default_factory=dict)
    errors: List[ErrorRecord] = field(default_factory=list)

    def __post_init__(self):
        self.current_account = None
        self.field_names = {
            "$": "split_amount",
            "~": "~?",
            "&": "&?",
            "%": "%?",
            "@": "@?",
            "A": "address",
            "B": "B?",
            "C": "cleared",
            "D": "isodate",
            "E": "split_memo",
            "F": "F?",
            "G": "G?",
            "I": "I?",
            "K": "K?",
            "L": "category",
            "M": "memo",
            "N": "name",
            "O": "O?",
            "R": "R?",
            "P": "payee",
            "Q": "Q?",
            "S": "split_category",
            "T": "amount",
            "U": "amount_unknown",
            "V": "V?",
            "Y": "Y?",
        }

    def parse_file(
        self,
        qif_file: str,
        encoding="iso-8859-1",
        verbose: bool = False,
        debug: bool = False,
    ):
        """
        parse a qif file

        Args:
            qif_file (str): Path to the input QIF file.
            encoding (str): File encoding. Defaults to 'iso-8859-1'.
            verbose (bool): if True give verbose output
            debug (bool): if True show debug output
        """
        with open(qif_file, "r", encoding=encoding) as file:
            content = file.readlines()
        self.parse(content, verbose=verbose, debug=debug)

    def parse(self, lines: List[str], verbose: bool = False, debug: bool = False):
        """
        parse the given list of lines
        """
        current_record = {}
        record_type = None
        start_line = 1

        for line_num, line in enumerate(lines, 1):
            line = line.strip()
            if not line:
                continue
            if debug:
                print(f"{line_num}:{line}")
            if line.startswith("$"):
                self.currency = "USD"
            elif line.startswith("€"):
                self.currency = "EUR"
            if line.startswith("!Option:"):
                option = line[8:]
                self.options[option] = True
            elif line.startswith("!Clear:"):
                option = line[8:]
                self.options[option] = False
            elif line.startswith("!Type:") or line.startswith("!Account"):
                if current_record:
                    self._add_record(
                        record_type, current_record, start_line, line_num - 1
                    )
                if line.startswith("!Account"):
                    record_type = "Account"
                else:
                    record_type = line[6:]  # Text after !Type:
                    pass
                current_record = {}
                start_line = line_num + 1
            elif line == "^":
                if current_record:
                    self._add_record(record_type, current_record, start_line, line_num)
                current_record = {}
                start_line = line_num + 1

            elif line[0] in self.field_names:
                first = line[0]
                key = self.field_names.get(first)
                value = line[1:].strip()
                if key in ["split_category", "split_memo", "split_amount"]:
                    if key not in current_record:
                        current_record[key] = []
                    current_record[key].append(value)
                else:
                    current_record[key] = value
            else:
                error = ErrorRecord(start_line=start_line, end_line=line_num, line=line)
                err_msg = f"parser can not handle line {line_num}: {line}"
                if verbose or debug:
                    logging.error(err_msg)
                self.errors.append(error)

        if current_record:
            self._add_record(record_type, current_record, start_line, len(lines))

    def _add_account(
        self,
        account_name: str,
        account_type: str,
        description: str,
        start_line,
        end_line,
    ) -> Account:
        """
        add an account for the given parameters making sure the parent account is created if need be
        """
        parts = account_name.split(":")
        name = parts[-1]
        parent_name = ":".join(parts[:-1]) if len(parts) > 1 else None
        parent_id = parent_name
        if account_type is None:
            account_type = self.default_account_type
        if parent_name and parent_name not in self.accounts:
            self.accounts[parent_name] = Account(
                name=parent_name,
                account_type=account_type,
                currency=self.currency,
                start_line=start_line,
                end_line=end_line,
            )
        account = Account(
            name=name,
            description=description,
            account_type=account_type,
            currency=self.currency,
            parent_account_id=parent_id,
            start_line=start_line,
            end_line=end_line,
        )
        self.accounts[account.name] = account
        return account

    def _add_record(
        self, record_type: str, record: Dict[str, Any], start_line: int, end_line: int
    ):
        """
        add the given record
        """
        record["_start_line"] = start_line
        record["_end_line"] = end_line
        if record_type == "Account":
            # @TODO allow external lookup of currency since quicken does not have it
            # Determine if the account has a parent by checking for a ':' in the name
            account_name = record.get("name", "")
            account_type = account_type = record.get("account_type")
            description = record.get("description", "")
            self.current_account = self._add_account(
                account_name,
                account_type=account_type,
                description=description,
                start_line=start_line,
                end_line=end_line,
            )
        elif record_type == "Class":
            qclass = QifClass(
                name=record.get("name", ""),
                description=record.get("description", ""),
                start_line=start_line,
                end_line=end_line,
            )
            self.classes[qclass.name] = qclass
        elif record_type == "Cat":
            cat = Category(
                name=record.get("name", ""),
                description=record.get("description", ""),
                start_line=start_line,
                end_line=end_line,
            )
            self.categories[cat.name] = cat
        else:
            tx = self.tx_for_record(record)
            if self.current_account:
                tx.account = self.current_account
                account_name = self.current_account.name
                tx_id = f"{account_name}:{tx.isodate}:{tx.start_line}"
            else:
                tx_id = f"{tx.isodate}:{tx.start_line}"

            self.transactions[tx_id] = tx

    def tx_for_record(self, t):
        """
        convert the transaction record
        """
        transaction = Transaction(start_line=t["_start_line"], end_line=t["_end_line"])

        for key, value in t.items():
            if key.startswith("_"):
                continue
            setattr(transaction, key, value)
        transaction.normalize()
        return transaction

    def get_lod(self) -> List[Dict[str, Any]]:
        """
        Get a list of dictionaries representing all transactions.

        Returns:
            List[Dict[str, Any]]: A list of dictionaries, each representing a transaction.
        """
        lod = []
        for tx in self.transactions.values():
            record = {
                # "tx_id": f"{self.current_account.name}:{tx.isodate}:{tx.start_line}",
                # "account": self.current_account.name,
                "isodate": tx.isodate,
                "amount": tx.amount,
                "payee": tx.payee,
                "memo": tx.memo,
                "category": tx.category,
                "number": tx.number,
                "cleared": tx.cleared,
                "address": tx.address,
                "split_category": (
                    ",".join(tx.split_category) if tx.split_category else None
                ),
                "split_memo": ",".join(tx.split_memo) if tx.split_memo else None,
                "split_amount": (
                    ",".join(map(str, tx.split_amount)) if tx.split_amount else None
                ),
                "qif_class": tx.qif_class.name if tx.qif_class else None,
            }
            lod.append(record)
        return lod

    def print_sample_transactions(self, num_samples: int = 7):
        print(f"\nSample of {min(num_samples, len(self.transactions))} transactions:")
        txs = list(self.transactions.values())[:num_samples]
        for idx, transaction in enumerate(txs, 1):
            print(
                f"Transaction {idx} (lines {transaction.start_line}-{transaction.end_line}):"
            )
            for field, value in vars(transaction).items():
                if field not in ["start_line", "end_line", "errors"]:
                    if isinstance(value, list):
                        print(f"  {field}: {', '.join(map(str, value))}")
                    else:
                        print(f"  {field}: {value}")
            if transaction.errors:
                print("  Errors:")
                for field, error in transaction.errors.items():
                    print(f"    {field}: {type(error).__name__}: {str(error)}")
            print()

    def print_parts(self, parts: dict, title: str, limit: int = 100000):
        print(f"\n{title}:")
        for i, part in enumerate(parts.values(), start=1):
            if i >= limit:
                break
            print(
                f"{i:3}: {part.name} - {part.description} (#{part.start_line}-{part.end_line})"
            )

    def get_stats(self) -> Stats:
        dates = [
            datetime.strptime(tx.isodate, "%Y-%m-%d")
            for tx in self.transactions.values()
            if tx.isodate
        ]
        if dates:
            min_date = min(dates).strftime("%Y-%m-%d")
            max_date = max(dates).strftime("%Y-%m-%d")
        else:
            min_date = max_date = None

        other_stats = {
            "options": self.options,
            "field_histogram": self._get_field_histogram(),
            "error_histogram": self._get_error_histogram(),
        }

        return Stats(
            accounts=len(self.accounts),
            transactions=len(self.transactions),
            start_date=min_date,
            end_date=max_date,
            classes=len(self.classes),
            categories=len(self.categories),
            errors=len(self.errors),
            other=other_stats
        )


    def _get_field_histogram(self) -> Dict[str, int]:
        field_counter = Counter()
        for transaction in self.transactions.values():
            for field, value in vars(transaction).items():
                if field not in ["start_line", "end_line", "errors"]:
                    if value is not None and (
                        not isinstance(value, list) or len(value) > 0
                    ):
                        field_counter[field] += 1
        return dict(field_counter)

    def _get_error_histogram(self) -> Dict[str, int]:
        error_counter = Counter()
        for transaction in self.transactions.values():
            error_counter.update(transaction.errors.keys())
        return dict(error_counter)

    def generate_error_report(self, max_errors_per_type=10):
        """
        Generate a detailed error report.

        Args:
            max_errors_per_type (int): Maximum number of errors to show for each error type.

        Returns:
            str: A formatted error report.
        """
        error_types = {}
        for transaction in self.transactions.values():
            for field, error in transaction.errors.items():
                error_type = type(error).__name__
                if error_type not in error_types:
                    error_types[error_type] = []
                error_types[error_type].append((transaction, field, error))

        report = ["Detailed Error Report:"]
        for error_type, errors in error_types.items():
            report.append(f"\n{error_type} ({len(errors)} occurrences):")
            for i, (transaction, field, error) in enumerate(
                errors[:max_errors_per_type], 1
            ):
                report.append(
                    f"  {i}. Line {transaction.start_line}-{transaction.end_line}, Field: {field}"
                )
                report.append(f"     Error: {str(error)}")
                report.append(
                    f"     Transaction: {self.transaction_summary(transaction)}"
                )
            if len(errors) > max_errors_per_type:
                report.append(f"  ... and {len(errors) - max_errors_per_type} more.")

        return "\n".join(report)

    def transaction_summary(self, transaction):
        """Helper function to provide a summary of a transaction."""
        summary = []
        for field, value in vars(transaction).items():
            if field not in ["start_line", "end_line", "errors"] and value is not None:
                summary.append(f"{field}: {value}")
        return ", ".join(summary)

    def show_summary(self, limit: int = 7):
        stats = self.get_stats()
        self.stats = stats

        # Display basic statistics
        print(f"Options: {stats.other.get('options', {})}")
        print(f"#classes: {stats.classes}")
        print(f"#categories: {stats.categories}")
        print(f"#accounts: {stats.accounts}")
        print(f"#transactions: {stats.transactions}")
        print(f"#errors: {stats.errors}")
        print(f"Date range: {stats.start_date} to {stats.end_date}")

        # Display field and error histograms
        print("Field histogram:")
        field_histogram = stats.other.get('field_histogram', {})
        for field, count in field_histogram.items():
            print(f"  {field}: {count}")

        print("Error histogram:")
        error_histogram = stats.other.get('error_histogram', {})
        for field, count in error_histogram.items():
            print(f"  {field}: {count}")

        # Display parts
        self.print_parts(self.accounts, "Accounts", limit=limit)
        self.print_parts(self.classes, "Classes", limit=limit)
        self.print_parts(self.categories, "Categories", limit=limit)

        # Display sample transactions
        print("\nSample Transactions:")
        self.print_sample_transactions()

        # Generate and display error report
        error_report = self.generate_error_report()
        print(error_report)

generate_error_report(max_errors_per_type=10)

Generate a detailed error report.

Parameters:

Name Type Description Default
max_errors_per_type int

Maximum number of errors to show for each error type.

10

Returns:

Name Type Description
str

A formatted error report.

Source code in nomina/qif.py
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
def generate_error_report(self, max_errors_per_type=10):
    """
    Generate a detailed error report.

    Args:
        max_errors_per_type (int): Maximum number of errors to show for each error type.

    Returns:
        str: A formatted error report.
    """
    error_types = {}
    for transaction in self.transactions.values():
        for field, error in transaction.errors.items():
            error_type = type(error).__name__
            if error_type not in error_types:
                error_types[error_type] = []
            error_types[error_type].append((transaction, field, error))

    report = ["Detailed Error Report:"]
    for error_type, errors in error_types.items():
        report.append(f"\n{error_type} ({len(errors)} occurrences):")
        for i, (transaction, field, error) in enumerate(
            errors[:max_errors_per_type], 1
        ):
            report.append(
                f"  {i}. Line {transaction.start_line}-{transaction.end_line}, Field: {field}"
            )
            report.append(f"     Error: {str(error)}")
            report.append(
                f"     Transaction: {self.transaction_summary(transaction)}"
            )
        if len(errors) > max_errors_per_type:
            report.append(f"  ... and {len(errors) - max_errors_per_type} more.")

    return "\n".join(report)

get_lod()

Get a list of dictionaries representing all transactions.

Returns:

Type Description
List[Dict[str, Any]]

List[Dict[str, Any]]: A list of dictionaries, each representing a transaction.

Source code in nomina/qif.py
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
def get_lod(self) -> List[Dict[str, Any]]:
    """
    Get a list of dictionaries representing all transactions.

    Returns:
        List[Dict[str, Any]]: A list of dictionaries, each representing a transaction.
    """
    lod = []
    for tx in self.transactions.values():
        record = {
            # "tx_id": f"{self.current_account.name}:{tx.isodate}:{tx.start_line}",
            # "account": self.current_account.name,
            "isodate": tx.isodate,
            "amount": tx.amount,
            "payee": tx.payee,
            "memo": tx.memo,
            "category": tx.category,
            "number": tx.number,
            "cleared": tx.cleared,
            "address": tx.address,
            "split_category": (
                ",".join(tx.split_category) if tx.split_category else None
            ),
            "split_memo": ",".join(tx.split_memo) if tx.split_memo else None,
            "split_amount": (
                ",".join(map(str, tx.split_amount)) if tx.split_amount else None
            ),
            "qif_class": tx.qif_class.name if tx.qif_class else None,
        }
        lod.append(record)
    return lod

parse(lines, verbose=False, debug=False)

parse the given list of lines

Source code in nomina/qif.py
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
def parse(self, lines: List[str], verbose: bool = False, debug: bool = False):
    """
    parse the given list of lines
    """
    current_record = {}
    record_type = None
    start_line = 1

    for line_num, line in enumerate(lines, 1):
        line = line.strip()
        if not line:
            continue
        if debug:
            print(f"{line_num}:{line}")
        if line.startswith("$"):
            self.currency = "USD"
        elif line.startswith("€"):
            self.currency = "EUR"
        if line.startswith("!Option:"):
            option = line[8:]
            self.options[option] = True
        elif line.startswith("!Clear:"):
            option = line[8:]
            self.options[option] = False
        elif line.startswith("!Type:") or line.startswith("!Account"):
            if current_record:
                self._add_record(
                    record_type, current_record, start_line, line_num - 1
                )
            if line.startswith("!Account"):
                record_type = "Account"
            else:
                record_type = line[6:]  # Text after !Type:
                pass
            current_record = {}
            start_line = line_num + 1
        elif line == "^":
            if current_record:
                self._add_record(record_type, current_record, start_line, line_num)
            current_record = {}
            start_line = line_num + 1

        elif line[0] in self.field_names:
            first = line[0]
            key = self.field_names.get(first)
            value = line[1:].strip()
            if key in ["split_category", "split_memo", "split_amount"]:
                if key not in current_record:
                    current_record[key] = []
                current_record[key].append(value)
            else:
                current_record[key] = value
        else:
            error = ErrorRecord(start_line=start_line, end_line=line_num, line=line)
            err_msg = f"parser can not handle line {line_num}: {line}"
            if verbose or debug:
                logging.error(err_msg)
            self.errors.append(error)

    if current_record:
        self._add_record(record_type, current_record, start_line, len(lines))

parse_file(qif_file, encoding='iso-8859-1', verbose=False, debug=False)

parse a qif file

Parameters:

Name Type Description Default
qif_file str

Path to the input QIF file.

required
encoding str

File encoding. Defaults to 'iso-8859-1'.

'iso-8859-1'
verbose bool

if True give verbose output

False
debug bool

if True show debug output

False
Source code in nomina/qif.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
def parse_file(
    self,
    qif_file: str,
    encoding="iso-8859-1",
    verbose: bool = False,
    debug: bool = False,
):
    """
    parse a qif file

    Args:
        qif_file (str): Path to the input QIF file.
        encoding (str): File encoding. Defaults to 'iso-8859-1'.
        verbose (bool): if True give verbose output
        debug (bool): if True show debug output
    """
    with open(qif_file, "r", encoding=encoding) as file:
        content = file.readlines()
    self.parse(content, verbose=verbose, debug=debug)

transaction_summary(transaction)

Helper function to provide a summary of a transaction.

Source code in nomina/qif.py
544
545
546
547
548
549
550
def transaction_summary(self, transaction):
    """Helper function to provide a summary of a transaction."""
    summary = []
    for field, value in vars(transaction).items():
        if field not in ["start_line", "end_line", "errors"] and value is not None:
            summary.append(f"{field}: {value}")
    return ", ".join(summary)

tx_for_record(t)

convert the transaction record

Source code in nomina/qif.py
388
389
390
391
392
393
394
395
396
397
398
399
def tx_for_record(self, t):
    """
    convert the transaction record
    """
    transaction = Transaction(start_line=t["_start_line"], end_line=t["_end_line"])

    for key, value in t.items():
        if key.startswith("_"):
            continue
        setattr(transaction, key, value)
    transaction.normalize()
    return transaction

Transaction dataclass

Bases: ParseRecord

a single transaction

Source code in nomina/qif.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
@lod_storable
class Transaction(ParseRecord):
    """
    a single transaction
    """

    isodate: Optional[str] = None
    amount: Optional[str] = None
    payee: Optional[str] = None
    memo: Optional[str] = None
    category: Optional[str] = None
    number: Optional[str] = None
    cleared: Optional[str] = None
    address: Optional[str] = None
    split_category: List[str] = field(default_factory=list)
    split_memo: List[str] = field(default_factory=list)
    split_amount: List[str] = field(default_factory=list)
    account: Optional[Account] = None
    qif_class: Optional[QifClass] = None
    category: Optional[Category] = None

    def __post_init__(self):
        self.amount_float: Optional[float] = None
        self.split_amounts_float: List[float] = []
        self.normalize()
        pass

    def normalize(self):
        """
        Normalize the transaction data, converting string amounts to floats.
        """
        try:
            if self.isodate:
                self.isodate = DateUtils.parse_date(self.isodate)
        except Exception as ex:
            self.errors["date"] = ex

        try:
            if self.amount:
                self.amount_float = self.parse_amount(self.amount)
        except Exception as ex:
            self.errors["amount"] = ex

        self.split_amounts_float = []
        for i, amount in enumerate(self.split_amount):
            try:
                self.split_amounts_float.append(self.parse_amount(amount))
            except Exception as ex:
                self.errors[f"split{i}"].append(ex)

    def parse_amount(self, amount_str: str) -> float:
        # Remove any currency symbols and whitespace
        cleaned_str = re.sub(r"[^\d,.-]", "", amount_str)
        # Replace comma with dot if comma is used as decimal separator
        if "," in cleaned_str and "." not in cleaned_str:
            cleaned_str = cleaned_str.replace(",", ".")
        elif "," in cleaned_str and "." in cleaned_str:
            cleaned_str = cleaned_str.replace(",", "")
        try:
            return float(cleaned_str)
        except ValueError:
            raise ValueError(f"Unable to parse amount: {amount_str}")

    def total_split_amount(self) -> float:
        """
        Calculate the total amount for split transactions.

        Returns:
            float: The sum of all split amounts.
        """
        return sum(self.split_amounts_float)

normalize()

Normalize the transaction data, converting string amounts to floats.

Source code in nomina/qif.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def normalize(self):
    """
    Normalize the transaction data, converting string amounts to floats.
    """
    try:
        if self.isodate:
            self.isodate = DateUtils.parse_date(self.isodate)
    except Exception as ex:
        self.errors["date"] = ex

    try:
        if self.amount:
            self.amount_float = self.parse_amount(self.amount)
    except Exception as ex:
        self.errors["amount"] = ex

    self.split_amounts_float = []
    for i, amount in enumerate(self.split_amount):
        try:
            self.split_amounts_float.append(self.parse_amount(amount))
        except Exception as ex:
            self.errors[f"split{i}"].append(ex)

total_split_amount()

Calculate the total amount for split transactions.

Returns:

Name Type Description
float float

The sum of all split amounts.

Source code in nomina/qif.py
164
165
166
167
168
169
170
171
def total_split_amount(self) -> float:
    """
    Calculate the total amount for split transactions.

    Returns:
        float: The sum of all split amounts.
    """
    return sum(self.split_amounts_float)

stats

Created on 2024-10-06

@author: wf

Stats

Ledger statistics

Source code in nomina/stats.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
@lod_storable
class Stats:
    """
    Ledger statistics
    """
    accounts: int
    transactions: int
    start_date: Optional[str] = None
    end_date: Optional[str] = None
    classes: Optional[int] = None
    categories: Optional[int] = None
    errors: Optional[int] = None
    other: Optional[Dict[str, Any]] = field(default_factory=dict)

version

Created on 2024-10-06 @author: wf

Version dataclass

Bases: object

Version handling for MoneyBrowser

Source code in nomina/version.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@dataclass
class Version(object):
    """
    Version handling for MoneyBrowser
    """
    name = "pynomina"
    version = "0.0.1"
    date = "2024-10-06"
    updated = "2024-10-06"
    description = "Personal finance tool"
    authors = "Wolfgang Fahl"

    doc_url = "https://wiki.bitplan.com/index.php/Pynomina"
    chat_url = "https://github.com/WolfgangFahl/pynomina/discussions/"
    cm_url = "https://github.com/WolfgangFahl/pynomina"

    license = f"""Copyright 2024 contributors. All rights reserved.

  Licensed under the Apache License 2.0
  http://www.apache.org/licenses/LICENSE-2.0

  Distributed on an "AS IS" basis without warranties
  or conditions of any kind, either express or implied."""
    longDescription = f"""{name} version {version}
{description}

  Created by {authors} on {date} last updated {updated}"""

webserver

Created on 2024-10-06

@author: wf

NominaSolution

Bases: InputWebSolution

the Nomina solution

Source code in nomina/webserver.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
class NominaSolution(InputWebSolution):
    """
    the Nomina solution
    """

    def __init__(self, webserver: NominaWebServer, client: Client):
        """
        Initialize the solution

        Calls the constructor of the base solution
        Args:
            webserver (NiceScadWebServer): The webserver instance associated with this context.
            client (Client): The client instance this context is associated with.
        """
        super().__init__(webserver, client)  # Call to the superclass constructor

__init__(webserver, client)

Initialize the solution

Calls the constructor of the base solution Args: webserver (NiceScadWebServer): The webserver instance associated with this context. client (Client): The client instance this context is associated with.

Source code in nomina/webserver.py
58
59
60
61
62
63
64
65
66
67
def __init__(self, webserver: NominaWebServer, client: Client):
    """
    Initialize the solution

    Calls the constructor of the base solution
    Args:
        webserver (NiceScadWebServer): The webserver instance associated with this context.
        client (Client): The client instance this context is associated with.
    """
    super().__init__(webserver, client)  # Call to the superclass constructor

NominaWebServer

Bases: InputWebserver

Nomina Webserver

Source code in nomina/webserver.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class NominaWebServer(InputWebserver):
    """
    Nomina Webserver
    """

    @classmethod
    def get_config(cls) -> WebserverConfig:
        copy_right = "(c)2024 Wolfgang Fahl"
        config = WebserverConfig(
            copy_right=copy_right,
            version=Version(),
            default_port=9849,
            short_name="nomina",
        )
        server_config = WebserverConfig.get(config)
        server_config.solution_class = NominaSolution
        return server_config

    def __init__(self):
        """Constructs all the necessary attributes for the WebServer object."""
        InputWebserver.__init__(self, config=NominaWebServer.get_config())

    def configure_run(self):
        root_path = (
            self.args.root_path
            if self.args.root_path
            else NominaWebServer.examples_path()
        )
        self.root_path = os.path.abspath(root_path)
        self.allowed_urls = [
            self.examples_path(),
            self.root_path,
        ]

    @classmethod
    def examples_path(cls) -> str:
        # the root directory (default: examples)
        path = os.path.join(os.path.dirname(__file__), "../nomina_examples")
        path = os.path.abspath(path)
        return path

__init__()

Constructs all the necessary attributes for the WebServer object.

Source code in nomina/webserver.py
30
31
32
def __init__(self):
    """Constructs all the necessary attributes for the WebServer object."""
    InputWebserver.__init__(self, config=NominaWebServer.get_config())