Skip to content

scan2wiki API Documentation

amazon

Created on 12023-11-16

@author: wf

Amazon

lookup products on amazon web site

Source code in scan/amazon.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
class Amazon:
    """
    lookup products on amazon web site
    """

    def __init__(self, debug: Optional[bool] = False):
        """
        constructor

        Args:
            debug (bool, optional): If set to True, pretty-prints the first product div for debugging.
        """
        self.debug = debug

    def extract_amazon_products(self, soup: BeautifulSoup) -> List[Product]:
        """
        Extracts product information from Amazon product listing HTML content.

        Args:
            soup (BeautifulSoup): Soup object of HTML content of the Amazon product listing page.

        Returns:
            List[Product]: A list of extracted product information as Product objects.
        """
        products = []
        # Find all div elements that match the product listing structure
        for index, div in enumerate(soup.find_all("div", class_="puisg-row")):
            product_info = {}

            # Pretty-print the first product div if debug is True
            if self.debug and index == 0:
                print("Debug - First Product Div:")
                print(div.prettify())  # Pretty-print the first div

            # Extracting product title
            title_div = div.find("h2", class_="a-size-mini")
            if title_div and title_div.a:
                product_info["title"] = title_div.a.get_text(strip=True)

            # Extracting product image URL and ASIN
            image_div = div.find("div", class_="s-product-image-container")
            if image_div and image_div.a:
                product_info["image_url"] = image_div.img["src"]
                link = image_div.a["href"]
                asin = link.split("/dp/")[-1].split("/")[0]
                product_info["asin"] = asin

            # Extracting product price
            price_span = div.find("span", class_="a-price")
            if price_span and price_span.find("span", class_="a-offscreen"):
                product_info["price"] = price_span.find(
                    "span", class_="a-offscreen"
                ).get_text(strip=True)
                # Replace '\xa0€' with ' €' in price
                product_info["price"] = product_info.get("price", "").replace(
                    "\xa0", " "
                )

            # Add product info to list if it contains any relevant data
            # Create a Product instance if title is present
            if "title" in product_info:
                product = Product(
                    title=product_info["title"],
                    image_url=product_info.get("image_url", ""),
                    price=product_info.get("price", ""),
                    asin=product_info.get("asin", ""),
                )
                products.append(product)

        return products

    def get_headers(self):
        # Possible components of a user agent string
        browsers = ["Chrome", "Firefox", "Safari", "Edge"]
        operating_systems = [
            "Windows NT 10.0; Win64; x64",
            "Macintosh; Intel Mac OS X 10_15_7",
            "X11; Linux x86_64",
        ]
        platforms = [
            "AppleWebKit/537.36 (KHTML, like Gecko)",
            "Gecko/20100101 Firefox/76.0",
            "AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.1 Safari/605.1.15",
        ]

        # Randomly select one component from each category
        browser = random.choice(browsers)
        os = random.choice(operating_systems)
        platform = random.choice(platforms)

        # Construct the user agent string
        user_agent = f"Mozilla/5.0 ({os}) {platform} {browser}/58.0.3029.110"

        headers = {"User-Agent": user_agent}
        return headers

    def lookup_products(self, search_key: str):
        """
        lookup the given search key e.g. ISBN or EAN
        """
        url = f"https://www.amazon.de/s?k={search_key}"

        headers = self.get_headers()

        response = requests.get(url, headers=headers)
        if response.status_code == 200:
            soup = BeautifulSoup(response.content, "html.parser")
            product_list = self.extract_amazon_products(soup)
            return product_list
        else:
            msg = f"lookup for {search_key} failed with HTML status code {response.status_code}"
            raise Exception(msg)

__init__(debug=False)

constructor

Parameters:

Name Type Description Default
debug bool

If set to True, pretty-prints the first product div for debugging.

False
Source code in scan/amazon.py
20
21
22
23
24
25
26
27
def __init__(self, debug: Optional[bool] = False):
    """
    constructor

    Args:
        debug (bool, optional): If set to True, pretty-prints the first product div for debugging.
    """
    self.debug = debug

extract_amazon_products(soup)

Extracts product information from Amazon product listing HTML content.

Parameters:

Name Type Description Default
soup BeautifulSoup

Soup object of HTML content of the Amazon product listing page.

required

Returns:

Type Description
List[Product]

List[Product]: A list of extracted product information as Product objects.

Source code in scan/amazon.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def extract_amazon_products(self, soup: BeautifulSoup) -> List[Product]:
    """
    Extracts product information from Amazon product listing HTML content.

    Args:
        soup (BeautifulSoup): Soup object of HTML content of the Amazon product listing page.

    Returns:
        List[Product]: A list of extracted product information as Product objects.
    """
    products = []
    # Find all div elements that match the product listing structure
    for index, div in enumerate(soup.find_all("div", class_="puisg-row")):
        product_info = {}

        # Pretty-print the first product div if debug is True
        if self.debug and index == 0:
            print("Debug - First Product Div:")
            print(div.prettify())  # Pretty-print the first div

        # Extracting product title
        title_div = div.find("h2", class_="a-size-mini")
        if title_div and title_div.a:
            product_info["title"] = title_div.a.get_text(strip=True)

        # Extracting product image URL and ASIN
        image_div = div.find("div", class_="s-product-image-container")
        if image_div and image_div.a:
            product_info["image_url"] = image_div.img["src"]
            link = image_div.a["href"]
            asin = link.split("/dp/")[-1].split("/")[0]
            product_info["asin"] = asin

        # Extracting product price
        price_span = div.find("span", class_="a-price")
        if price_span and price_span.find("span", class_="a-offscreen"):
            product_info["price"] = price_span.find(
                "span", class_="a-offscreen"
            ).get_text(strip=True)
            # Replace '\xa0€' with ' €' in price
            product_info["price"] = product_info.get("price", "").replace(
                "\xa0", " "
            )

        # Add product info to list if it contains any relevant data
        # Create a Product instance if title is present
        if "title" in product_info:
            product = Product(
                title=product_info["title"],
                image_url=product_info.get("image_url", ""),
                price=product_info.get("price", ""),
                asin=product_info.get("asin", ""),
            )
            products.append(product)

    return products

lookup_products(search_key)

lookup the given search key e.g. ISBN or EAN

Source code in scan/amazon.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def lookup_products(self, search_key: str):
    """
    lookup the given search key e.g. ISBN or EAN
    """
    url = f"https://www.amazon.de/s?k={search_key}"

    headers = self.get_headers()

    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        soup = BeautifulSoup(response.content, "html.parser")
        product_list = self.extract_amazon_products(soup)
        return product_list
    else:
        msg = f"lookup for {search_key} failed with HTML status code {response.status_code}"
        raise Exception(msg)

barcode

Created on 2023-11-16

@author: wf

Barcode dataclass

Barcode data structure with static methods e.g. e.g. pyzbar barcode decoder wrapper

Source code in scan/barcode.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@dataclass
class Barcode:
    """
    Barcode data structure with
    static methods e.g.  e.g. pyzbar barcode decoder wrapper
    """

    code: str
    type: str
    orientation: str
    rect: Optional[dict] = None
    polygon: Optional[List[dict]] = None
    quality: Optional[int] = None

    @staticmethod
    def decode(image_file_path: str, debug: bool = False):
        """
        Decodes barcodes from the image at the given file path.

        Args:
            image_file_path (str): The file path of the image to decode.
            debug (bool): If False, suppress debug information of the PIL library. Default is False.

        Returns:
            list[Barcode]: A list of Barcode objects, or an empty list if no barcodes are found.
        """
        if not debug:
            # Suppress debug messages
            logging.getLogger("PIL").setLevel(logging.INFO)
        # Open the saved image
        image = Image.open(image_file_path)
        # Decode barcodes
        barcodes = decode(image)
        barcode_list = [
            Barcode(
                code=barcode.data.decode("utf-8"),
                type=barcode.type,
                rect=barcode.rect._asdict(),
                polygon=[point._asdict() for point in barcode.polygon],
                quality=barcode.quality,
                orientation=barcode.orientation,
            )
            for barcode in barcodes
        ]
        return barcode_list

decode(image_file_path, debug=False) staticmethod

Decodes barcodes from the image at the given file path.

Parameters:

Name Type Description Default
image_file_path str

The file path of the image to decode.

required
debug bool

If False, suppress debug information of the PIL library. Default is False.

False

Returns:

Type Description

list[Barcode]: A list of Barcode objects, or an empty list if no barcodes are found.

Source code in scan/barcode.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@staticmethod
def decode(image_file_path: str, debug: bool = False):
    """
    Decodes barcodes from the image at the given file path.

    Args:
        image_file_path (str): The file path of the image to decode.
        debug (bool): If False, suppress debug information of the PIL library. Default is False.

    Returns:
        list[Barcode]: A list of Barcode objects, or an empty list if no barcodes are found.
    """
    if not debug:
        # Suppress debug messages
        logging.getLogger("PIL").setLevel(logging.INFO)
    # Open the saved image
    image = Image.open(image_file_path)
    # Decode barcodes
    barcodes = decode(image)
    barcode_list = [
        Barcode(
            code=barcode.data.decode("utf-8"),
            type=barcode.type,
            rect=barcode.rect._asdict(),
            polygon=[point._asdict() for point in barcode.polygon],
            quality=barcode.quality,
            orientation=barcode.orientation,
        )
        for barcode in barcodes
    ]
    return barcode_list

dms

Created on 2021-10-21

@author: wf

see http://diagrams.bitplan.com/render/png/0xe1f1d160.png see http://diagrams.bitplan.com/render/txt/0xe1f1d160.txt

Archive

Bases: JSONAble

an Archive might be a filesystem on a server or a (semantic) mediawiki

Source code in scan/dms.py
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
class Archive(JSONAble):
    """
    an Archive might be a filesystem
    on a server or a (semantic) mediawiki
    """

    def __init__(self):
        """
        Constructor
        """

    @classmethod
    def getSamples(cls):
        samplesLOD = [
            {
                "server": "wiki.bitplan.com",
                "name": "wiki",
                "url": "http://wiki.bitplan.com",
                "wikiid": "wiki",
                "folderCount": 0,
                "documentCount": 0,
            },
            {
                "server": "media.bitplan.com",
                "name": "media",
                "url": "http://media.bitplan.com",
                "wikiid": "media",
                "folderCount": 9,
                "documentCount": 551,
            },
        ]
        return samplesLOD

    def normalizePageTitle(self, pageTitle):
        """
        normalize the given pageTitle
        """
        nPageTitle = pageTitle.replace(" ", "_")
        return nPageTitle

    def getFoldersAndDocuments(self, withOcr=False):
        """
        get the folders of this archive

        Return:
            the list of folders and files
        """
        foldersByPath = {}
        documentList = []
        # this archive is pointing to a wiki
        if hasattr(self, "wikiid") and self.wikiid is not None:
            smw = Wiki.getSMW(self.wikiid)
            for option in ["|format=count", ""]:
                askQuery = (
                    """{{#ask: [[Category:OCRDocument]]  
| mainlabel=page
| ?Category
| ?Modification date=lastModified
| ?Creation date=created
|limit=1000
%s
}}"""
                    % option
                )
                print(askQuery)
                result = smw.query(askQuery)
                baseUrl = f"{smw.site.scheme}://{smw.site.host}{smw.site.path}index.php"
                if option == "":
                    folderCounter = Counter()
                    folderCreated = {}
                    folderLastModified = {}
                    for record in result.values():
                        page = record["page"]
                        if "Kategorie" in record:
                            catname = "Kategorie"
                            categories = record["Kategorie"]
                        else:
                            catname = "Category"
                            categories = record["Category"]
                        doc = Document()
                        doc.archiveName = self.name
                        if isinstance(categories, list):
                            firstCategory = categories[0]
                        else:
                            firstCategory = categories
                        doc.folderPath = firstCategory.replace(f"{catname}:", "")
                        # print(f"{firstCategory}->{doc.folderPath}")
                        doc.lastModified = record["lastModified"]
                        doc.created = record["created"]
                        folderCounter[doc.folderPath] += 1
                        if doc.created:
                            if doc.folderPath in folderCreated:
                                folderCreated[doc.folderPath] = min(
                                    doc.created, folderCreated[doc.folderPath]
                                )
                            else:
                                folderCreated[doc.folderPath] = doc.created
                        if doc.lastModified:
                            if doc.folderPath in folderLastModified:
                                folderLastModified[doc.folderPath] = max(
                                    doc.lastModified, folderLastModified[doc.folderPath]
                                )
                            else:
                                folderLastModified[doc.folderPath] = doc.lastModified

                        doc.name = page
                        doc.url = f"{baseUrl}/{self.normalizePageTitle(page)}"
                        documentList.append(doc)
                    # collect folders
                    for folderName, count in folderCounter.most_common():
                        folder = Folder()
                        folder.archiveName = self.name
                        folder.name = folderName
                        folder.path = folderName
                        if folderName in folderLastModified:
                            folder.lastModified = folderLastModified[folderName]
                        if folderName in folderCreated:
                            folder.created = folderCreated[folderName]
                        folder.url = f"{baseUrl}/Category:{folderName}"
                        folder.fileCount = count
                        foldersByPath[folderName] = folder
                        pass
        else:
            # this archive is pointing to a folder
            pattern = rf"http://{self.server}"
            folderPath = re.sub(pattern, "", self.url)
            basePath = Folder.getFullpath(folderPath)
            for root, dirs, files in os.walk(basePath):
                relbase = Folder.getRelpath(root)
                # loop over all directories
                for dirname in dirs:
                    if not dirname.startswith("."):
                        folder = Folder()
                        folder.archive = self
                        fullpath = os.path.join(root, dirname)
                        folder.path = os.path.join(relbase, dirname)
                        folder.archiveName = self.name
                        folder.url = f"http://{self.server}{folder.path}"
                        folder.name = dirname
                        # files in folder ...
                        pdfFiles = folder.getFiles()
                        folder.fileCount = len(pdfFiles)
                        folder.lastModified = DMSStorage.getDatetime(fullpath)
                        folder.created = folder.lastModified
                        folderDocuments = folder.getDocuments(pdfFiles, withOcr=withOcr)
                        # add the results
                        documentList.extend(folderDocuments)
                        foldersByPath[folder.path] = folder
            pass
        return foldersByPath, documentList

__init__()

Constructor

Source code in scan/dms.py
670
671
672
673
def __init__(self):
    """
    Constructor
    """

getFoldersAndDocuments(withOcr=False)

get the folders of this archive

Return

the list of folders and files

Source code in scan/dms.py
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
    def getFoldersAndDocuments(self, withOcr=False):
        """
        get the folders of this archive

        Return:
            the list of folders and files
        """
        foldersByPath = {}
        documentList = []
        # this archive is pointing to a wiki
        if hasattr(self, "wikiid") and self.wikiid is not None:
            smw = Wiki.getSMW(self.wikiid)
            for option in ["|format=count", ""]:
                askQuery = (
                    """{{#ask: [[Category:OCRDocument]]  
| mainlabel=page
| ?Category
| ?Modification date=lastModified
| ?Creation date=created
|limit=1000
%s
}}"""
                    % option
                )
                print(askQuery)
                result = smw.query(askQuery)
                baseUrl = f"{smw.site.scheme}://{smw.site.host}{smw.site.path}index.php"
                if option == "":
                    folderCounter = Counter()
                    folderCreated = {}
                    folderLastModified = {}
                    for record in result.values():
                        page = record["page"]
                        if "Kategorie" in record:
                            catname = "Kategorie"
                            categories = record["Kategorie"]
                        else:
                            catname = "Category"
                            categories = record["Category"]
                        doc = Document()
                        doc.archiveName = self.name
                        if isinstance(categories, list):
                            firstCategory = categories[0]
                        else:
                            firstCategory = categories
                        doc.folderPath = firstCategory.replace(f"{catname}:", "")
                        # print(f"{firstCategory}->{doc.folderPath}")
                        doc.lastModified = record["lastModified"]
                        doc.created = record["created"]
                        folderCounter[doc.folderPath] += 1
                        if doc.created:
                            if doc.folderPath in folderCreated:
                                folderCreated[doc.folderPath] = min(
                                    doc.created, folderCreated[doc.folderPath]
                                )
                            else:
                                folderCreated[doc.folderPath] = doc.created
                        if doc.lastModified:
                            if doc.folderPath in folderLastModified:
                                folderLastModified[doc.folderPath] = max(
                                    doc.lastModified, folderLastModified[doc.folderPath]
                                )
                            else:
                                folderLastModified[doc.folderPath] = doc.lastModified

                        doc.name = page
                        doc.url = f"{baseUrl}/{self.normalizePageTitle(page)}"
                        documentList.append(doc)
                    # collect folders
                    for folderName, count in folderCounter.most_common():
                        folder = Folder()
                        folder.archiveName = self.name
                        folder.name = folderName
                        folder.path = folderName
                        if folderName in folderLastModified:
                            folder.lastModified = folderLastModified[folderName]
                        if folderName in folderCreated:
                            folder.created = folderCreated[folderName]
                        folder.url = f"{baseUrl}/Category:{folderName}"
                        folder.fileCount = count
                        foldersByPath[folderName] = folder
                        pass
        else:
            # this archive is pointing to a folder
            pattern = rf"http://{self.server}"
            folderPath = re.sub(pattern, "", self.url)
            basePath = Folder.getFullpath(folderPath)
            for root, dirs, files in os.walk(basePath):
                relbase = Folder.getRelpath(root)
                # loop over all directories
                for dirname in dirs:
                    if not dirname.startswith("."):
                        folder = Folder()
                        folder.archive = self
                        fullpath = os.path.join(root, dirname)
                        folder.path = os.path.join(relbase, dirname)
                        folder.archiveName = self.name
                        folder.url = f"http://{self.server}{folder.path}"
                        folder.name = dirname
                        # files in folder ...
                        pdfFiles = folder.getFiles()
                        folder.fileCount = len(pdfFiles)
                        folder.lastModified = DMSStorage.getDatetime(fullpath)
                        folder.created = folder.lastModified
                        folderDocuments = folder.getDocuments(pdfFiles, withOcr=withOcr)
                        # add the results
                        documentList.extend(folderDocuments)
                        foldersByPath[folder.path] = folder
            pass
        return foldersByPath, documentList

normalizePageTitle(pageTitle)

normalize the given pageTitle

Source code in scan/dms.py
697
698
699
700
701
702
def normalizePageTitle(self, pageTitle):
    """
    normalize the given pageTitle
    """
    nPageTitle = pageTitle.replace(" ", "_")
    return nPageTitle

ArchiveManager

Bases: EntityManager

manager for Archives

Source code in scan/dms.py
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
class ArchiveManager(EntityManager):
    """
    manager for Archives
    """

    def __init__(self, mode="sql", debug=False):
        """constructor"""
        name = "archive"
        entityName = "Archive"
        entityPluralName = "archives"
        listName = entityPluralName
        clazz = Archive
        tableName = name
        config = DMSStorage.getStorageConfig(mode=mode, debug=debug)
        handleInvalidListTypes = True
        filterInvalidListTypes = True
        primaryKey = "url"
        super().__init__(
            name,
            entityName,
            entityPluralName,
            listName,
            clazz,
            tableName,
            primaryKey,
            config,
            handleInvalidListTypes,
            filterInvalidListTypes,
            debug,
        )

    @staticmethod
    def getInstance(mode=None):
        if mode is None:
            ams = ArchiveManager(mode="sql")
            if not ams.isCached():
                amj = ArchiveManager(mode="json")
                amj.fromCache()
                ams.archives = amj.archives
                ams.store()
            am = ams
            DMSStorage.fromCache(ams)
            am = ams
        else:
            am = ArchiveManager(mode)
        return am

    @staticmethod
    def addFilesAndFoldersForArchive(
        archive=None, withOcr=False, store=False, debug=True
    ):
        """
        add Files and folder for the given Archive

        Args:
            archive(Archive): the archive to add files and folder for
            store(bool): True if the result should be stored in the storage
            debug(bool): True if debugging messages should be displayed
        """
        if archive is None:
            return
        folders = []
        msg = f"getting folders for {archive.name}"
        if debug:
            print(msg)
        afoldersByPath, documentList = archive.getFoldersAndDocuments(withOcr=withOcr)
        folderCount = len(afoldersByPath)
        msg = f"found {folderCount} folders in {archive.name}"
        folders.extend(afoldersByPath.values())
        if debug:
            print(msg)
        if store:
            if len(folders) > 0:
                fms = FolderManager(mode="sql")
                fms.folders = folders
                fms.store(append=True, replace=True)
            if len(documentList) > 0:
                dms = DocumentManager(mode="sql")
                dms.documents = documentList
                dms.store(append=True, replace=True)

__init__(mode='sql', debug=False)

constructor

Source code in scan/dms.py
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
def __init__(self, mode="sql", debug=False):
    """constructor"""
    name = "archive"
    entityName = "Archive"
    entityPluralName = "archives"
    listName = entityPluralName
    clazz = Archive
    tableName = name
    config = DMSStorage.getStorageConfig(mode=mode, debug=debug)
    handleInvalidListTypes = True
    filterInvalidListTypes = True
    primaryKey = "url"
    super().__init__(
        name,
        entityName,
        entityPluralName,
        listName,
        clazz,
        tableName,
        primaryKey,
        config,
        handleInvalidListTypes,
        filterInvalidListTypes,
        debug,
    )

addFilesAndFoldersForArchive(archive=None, withOcr=False, store=False, debug=True) staticmethod

add Files and folder for the given Archive

Parameters:

Name Type Description Default
archive(Archive)

the archive to add files and folder for

required
store(bool)

True if the result should be stored in the storage

required
debug(bool)

True if debugging messages should be displayed

required
Source code in scan/dms.py
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
@staticmethod
def addFilesAndFoldersForArchive(
    archive=None, withOcr=False, store=False, debug=True
):
    """
    add Files and folder for the given Archive

    Args:
        archive(Archive): the archive to add files and folder for
        store(bool): True if the result should be stored in the storage
        debug(bool): True if debugging messages should be displayed
    """
    if archive is None:
        return
    folders = []
    msg = f"getting folders for {archive.name}"
    if debug:
        print(msg)
    afoldersByPath, documentList = archive.getFoldersAndDocuments(withOcr=withOcr)
    folderCount = len(afoldersByPath)
    msg = f"found {folderCount} folders in {archive.name}"
    folders.extend(afoldersByPath.values())
    if debug:
        print(msg)
    if store:
        if len(folders) > 0:
            fms = FolderManager(mode="sql")
            fms.folders = folders
            fms.store(append=True, replace=True)
        if len(documentList) > 0:
            dms = DocumentManager(mode="sql")
            dms.documents = documentList
            dms.store(append=True, replace=True)

DMSStorage

Document management system storage configuration

Source code in scan/dms.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
class DMSStorage:
    """
    Document management system storage configuration
    """

    profile = True
    withShowProgress = True

    @staticmethod
    def getStorageConfig(debug: bool = False, mode="sql") -> StorageConfig:
        """
        get the storageConfiguration

        Args:
            debug(bool): if True show debug information
            mode(str): sql or json

        Return:
            StorageConfig: the storage configuration to be used
        """
        if mode == "sql":
            config = StorageConfig.getSQL(debug=debug)
        elif mode == "json":
            config = StorageConfig.getJSON()
        elif mode == "jsonpickle":
            config = StorageConfig.getJsonPickle(debug=debug)
        else:
            raise Exception(f"invalid mode {mode}")
        config.cacheDirName = "dms"
        cachedir = config.getCachePath()
        config.profile = DMSStorage.profile
        config.withShowProgress = DMSStorage.withShowProgress
        if mode == "sql":
            config.cacheFile = f"{cachedir}/dms.db"
        return config

    @staticmethod
    def getScanDir():
        """
        get the scan/watch directory to be used

        Returns:
            str: the path to the scan directory
        """
        home = str(Path.home())
        scandir = f"{home}/Pictures/scans"
        os.makedirs(scandir, exist_ok=True)
        return scandir

    @staticmethod
    def getSqlDB():
        """
        get the SQlite database connection
        """
        config = DMSStorage.getStorageConfig(mode="sql")
        # https://stackoverflow.com/a/48234567/1497139
        sqlDB = SQLDB(config.cacheFile, check_same_thread=False)
        return sqlDB

    @staticmethod
    def getDatetime(fullpath: str):
        """
        get the last modification time

        Args:
            fullpath(str): the path to get the datetime for
        """
        timestamp = os.path.getmtime(fullpath)
        ftime = datetime.fromtimestamp(timestamp)
        return ftime

    @staticmethod
    def getTimeStr(fullpath: str):
        """
        get the last modification time

        Args:
            fullpath(str): the path to get the time string for
        """
        ftime = DMSStorage.getDatetime(fullpath)
        ftimestr = ftime.strftime("%Y-%m-%d %H:%M:%S")
        return ftimestr

    @staticmethod
    def fromCache(em: EntityManager):
        """
        initialize the given entity manager from it's cache

        Args:
            em(EntityManager): the entity manager to initialize
        """
        if em.isCached():
            em.fromCache()
        else:
            if em.config.mode is StoreMode.SQL:
                sqlDB = DMSStorage.getSqlDB()
                em.initSQLDB(sqlDB)

fromCache(em) staticmethod

initialize the given entity manager from it's cache

Parameters:

Name Type Description Default
em(EntityManager)

the entity manager to initialize

required
Source code in scan/dms.py
193
194
195
196
197
198
199
200
201
202
203
204
205
206
@staticmethod
def fromCache(em: EntityManager):
    """
    initialize the given entity manager from it's cache

    Args:
        em(EntityManager): the entity manager to initialize
    """
    if em.isCached():
        em.fromCache()
    else:
        if em.config.mode is StoreMode.SQL:
            sqlDB = DMSStorage.getSqlDB()
            em.initSQLDB(sqlDB)

getDatetime(fullpath) staticmethod

get the last modification time

Parameters:

Name Type Description Default
fullpath(str)

the path to get the datetime for

required
Source code in scan/dms.py
169
170
171
172
173
174
175
176
177
178
179
@staticmethod
def getDatetime(fullpath: str):
    """
    get the last modification time

    Args:
        fullpath(str): the path to get the datetime for
    """
    timestamp = os.path.getmtime(fullpath)
    ftime = datetime.fromtimestamp(timestamp)
    return ftime

getScanDir() staticmethod

get the scan/watch directory to be used

Returns:

Name Type Description
str

the path to the scan directory

Source code in scan/dms.py
146
147
148
149
150
151
152
153
154
155
156
157
@staticmethod
def getScanDir():
    """
    get the scan/watch directory to be used

    Returns:
        str: the path to the scan directory
    """
    home = str(Path.home())
    scandir = f"{home}/Pictures/scans"
    os.makedirs(scandir, exist_ok=True)
    return scandir

getSqlDB() staticmethod

get the SQlite database connection

Source code in scan/dms.py
159
160
161
162
163
164
165
166
167
@staticmethod
def getSqlDB():
    """
    get the SQlite database connection
    """
    config = DMSStorage.getStorageConfig(mode="sql")
    # https://stackoverflow.com/a/48234567/1497139
    sqlDB = SQLDB(config.cacheFile, check_same_thread=False)
    return sqlDB

getStorageConfig(debug=False, mode='sql') staticmethod

get the storageConfiguration

Parameters:

Name Type Description Default
debug(bool)

if True show debug information

required
mode(str)

sql or json

required
Return

StorageConfig: the storage configuration to be used

Source code in scan/dms.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
@staticmethod
def getStorageConfig(debug: bool = False, mode="sql") -> StorageConfig:
    """
    get the storageConfiguration

    Args:
        debug(bool): if True show debug information
        mode(str): sql or json

    Return:
        StorageConfig: the storage configuration to be used
    """
    if mode == "sql":
        config = StorageConfig.getSQL(debug=debug)
    elif mode == "json":
        config = StorageConfig.getJSON()
    elif mode == "jsonpickle":
        config = StorageConfig.getJsonPickle(debug=debug)
    else:
        raise Exception(f"invalid mode {mode}")
    config.cacheDirName = "dms"
    cachedir = config.getCachePath()
    config.profile = DMSStorage.profile
    config.withShowProgress = DMSStorage.withShowProgress
    if mode == "sql":
        config.cacheFile = f"{cachedir}/dms.db"
    return config

getTimeStr(fullpath) staticmethod

get the last modification time

Parameters:

Name Type Description Default
fullpath(str)

the path to get the time string for

required
Source code in scan/dms.py
181
182
183
184
185
186
187
188
189
190
191
@staticmethod
def getTimeStr(fullpath: str):
    """
    get the last modification time

    Args:
        fullpath(str): the path to get the time string for
    """
    ftime = DMSStorage.getDatetime(fullpath)
    ftimestr = ftime.strftime("%Y-%m-%d %H:%M:%S")
    return ftimestr

Document

Bases: JSONAble

a document consist of one or more files in the filesystem or a wikipage - the name is the pagetitle or the filename without extension

types then has the list of available file types e.g. "pdf,txt" for single page Documents the document is somewhat redundant to the Page concept

Source code in scan/dms.py
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
class Document(JSONAble):
    """
    a document consist of one or more files in the filesystem
    or a wikipage - the name is the pagetitle
    or the filename without extension

    types then has the list of available file types e.g. "pdf,txt"
    for single page Documents  the document is somewhat redundant to the Page concept
    """

    @classmethod
    def getSamples(cls):
        samplesLOD = [
            {
                "archiveName": "bitplan-scan",
                "folderPath": "",
                # TODO: fullpath, filename, basename and timestampStr not needed
                "fullpath": "",
                "fileName": "",
                "basename": "",
                "timestampStr": "",
                "pageTitle": "",
                "categories": "",
                "topic": "",
                "url": "http://capri.bitplan.com/bitplan/scan/2019/",
                "created": datetime(2019, 2, 27, 10, 7, 56),
                "size": 15,
                "lastModified": datetime(2019, 2, 27, 10, 7, 56),
                "name": "2019",
                "types": "pdf",
                "ocrText": "",
            }
        ]
        return samplesLOD

    def __init__(self):
        """
        construct me
        """
        pass

    def fromDict(self, record):
        """
        overwrite the from Dict
        """
        super().fromDict(record)
        pass

    def fromFile(self, folderPath, file, local=False, withOcr=False):
        """
        Args:
            folderPath(str): the directory
            file(str): the file
            withOcr(bool): if true get the OCRText
        """
        self.folderPath = folderPath
        self.name = file
        self.fullpath = f"{Folder.getFullpath(self.folderPath,local)}/{file}"
        self.size = os.path.getsize(self.fullpath)
        self.lastModified = DMSStorage.getDatetime(self.fullpath)
        self.created = self.lastModified
        self.timestampStr = DMSStorage.getTimeStr(self.fullpath)
        self.fileName = Path(self.fullpath).name
        self.baseName = Path(self.fullpath).stem
        self.pageTitle = f"{self.baseName}"

        self.categories = f"{datetime.now().year}"
        self.topic = "OCRDocument"
        if withOcr:
            self.getOcrText()
        pass

    def __str__(self):
        text = "Upload:"
        self.fields = ["fileName", "ocrText"]
        delim = ""
        for fieldname in self.fields:
            text += "%s%s=%s" % (delim, fieldname, self.__dict__[fieldname])
            delim = ","
        return text

    def getPDFText(self):
        """
        get my PDF Text
        """
        pdfText = None
        if self.fullpath.lower().endswith(".pdf"):
            pdfText = PDFMiner.getPDFText(self.fullpath)
        return pdfText

    def readTextFromFile(self, fileName: str) -> str:
        """
        read text from the given fileName
        """
        try:
            with open(fileName, "r") as textFile:
                return textFile.read()
        except UnicodeDecodeError as _ude:
            # print(f"couldn't decode {fileName}")
            with open(fileName, "rb") as file:
                content = file.read()
                suggestion = UnicodeDammit(content)
                encoding = suggestion.original_encoding
                if encoding is None:
                    encoding = "utf-8"
                try:
                    text = content.decode(encoding)
                except Exception as ex:
                    raise (ex)
                return text

    def getOcrText(self):
        """
        get the OCR
        """
        parent = Path(self.fullpath).parent.absolute()
        ocrPath = f"{parent}/.ocr"
        self.ocrText = None
        if os.path.isdir(ocrPath):
            ocrFileName = f"{ocrPath}/{self.basename}.txt"
            if os.path.isfile(ocrFileName):
                self.ocrText = self.readTextFromFile(ocrFileName)
            else:
                page = 1
                maxPages = 1000
                pageFileName = f"{ocrPath}/{self.basename}_p{page:03d}.txt"
                if os.path.isfile(pageFileName):
                    pageText = self.readTextFromFile(pageFileName)
                    if pageText is not None:
                        self.ocrText = pageText
                        for page in range(2, maxPages):
                            pageFileName = f"{ocrPath}/{self.basename}_p{page:03d}.txt"
                            if not os.path.isfile(pageFileName):
                                break
                            nextPage = self.readTextFromFile(pageFileName)
                            if nextPage is not None:
                                self.ocrText += nextPage
        if self.ocrText is None:
            self.ocrText = self.getPDFText()
        return self.ocrText

    def uploadFile(self, wikiId):
        """
        call back
        """
        pageContent = self.getContent()
        ignoreExists = True
        wikipush = WikiPush(fromWikiId=None, toWikiId=wikiId, login=True)
        description = f"scanned at {self.timestampStr}"
        msg = f"uploading {self.pageTitle} ({self.fileName}) to {wikiId} ... "
        files = [self.fullpath]
        wikipush.upload(files, force=ignoreExists)
        pageToBeEdited = wikipush.toWiki.getPage(self.pageTitle)
        if (not pageToBeEdited.exists) or ignoreExists:
            pageToBeEdited.edit(pageContent, description)
            wikipush.log(msg + "✅")
            pass

    def getContent(self):
        """
        get my content

        Return:
            str: the content of the wikipage
        """
        wikicats = ""
        delim = ""
        for category in self.categories.split(","):
            wikicats += "%s[[Category:%s]]" % (delim, category)
            delim = "\n"
        if self.fileName.endswith(".pdf"):
            template = """= pdf pages =
<pdf>%s</pdf>
= text =
<pre>%s</pre>
= pdf =
[[File:%s]]
%s
<headertabs/>
"""
            pageContent = template % (
                self.fileName,
                self.ocrText,
                self.fileName,
                wikicats,
            )
        else:
            template = """[[File:%s]]
%s
<headertabs/>"""
            pageContent = template % (self.fileName, wikicats)

        return pageContent

__init__()

construct me

Source code in scan/dms.py
244
245
246
247
248
def __init__(self):
    """
    construct me
    """
    pass

fromDict(record)

overwrite the from Dict

Source code in scan/dms.py
250
251
252
253
254
255
def fromDict(self, record):
    """
    overwrite the from Dict
    """
    super().fromDict(record)
    pass

fromFile(folderPath, file, local=False, withOcr=False)

Parameters:

Name Type Description Default
folderPath(str)

the directory

required
file(str)

the file

required
withOcr(bool)

if true get the OCRText

required
Source code in scan/dms.py
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
def fromFile(self, folderPath, file, local=False, withOcr=False):
    """
    Args:
        folderPath(str): the directory
        file(str): the file
        withOcr(bool): if true get the OCRText
    """
    self.folderPath = folderPath
    self.name = file
    self.fullpath = f"{Folder.getFullpath(self.folderPath,local)}/{file}"
    self.size = os.path.getsize(self.fullpath)
    self.lastModified = DMSStorage.getDatetime(self.fullpath)
    self.created = self.lastModified
    self.timestampStr = DMSStorage.getTimeStr(self.fullpath)
    self.fileName = Path(self.fullpath).name
    self.baseName = Path(self.fullpath).stem
    self.pageTitle = f"{self.baseName}"

    self.categories = f"{datetime.now().year}"
    self.topic = "OCRDocument"
    if withOcr:
        self.getOcrText()
    pass

getContent()

get my content

Return

str: the content of the wikipage

Source code in scan/dms.py
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
    def getContent(self):
        """
        get my content

        Return:
            str: the content of the wikipage
        """
        wikicats = ""
        delim = ""
        for category in self.categories.split(","):
            wikicats += "%s[[Category:%s]]" % (delim, category)
            delim = "\n"
        if self.fileName.endswith(".pdf"):
            template = """= pdf pages =
<pdf>%s</pdf>
= text =
<pre>%s</pre>
= pdf =
[[File:%s]]
%s
<headertabs/>
"""
            pageContent = template % (
                self.fileName,
                self.ocrText,
                self.fileName,
                wikicats,
            )
        else:
            template = """[[File:%s]]
%s
<headertabs/>"""
            pageContent = template % (self.fileName, wikicats)

        return pageContent

getOcrText()

get the OCR

Source code in scan/dms.py
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
def getOcrText(self):
    """
    get the OCR
    """
    parent = Path(self.fullpath).parent.absolute()
    ocrPath = f"{parent}/.ocr"
    self.ocrText = None
    if os.path.isdir(ocrPath):
        ocrFileName = f"{ocrPath}/{self.basename}.txt"
        if os.path.isfile(ocrFileName):
            self.ocrText = self.readTextFromFile(ocrFileName)
        else:
            page = 1
            maxPages = 1000
            pageFileName = f"{ocrPath}/{self.basename}_p{page:03d}.txt"
            if os.path.isfile(pageFileName):
                pageText = self.readTextFromFile(pageFileName)
                if pageText is not None:
                    self.ocrText = pageText
                    for page in range(2, maxPages):
                        pageFileName = f"{ocrPath}/{self.basename}_p{page:03d}.txt"
                        if not os.path.isfile(pageFileName):
                            break
                        nextPage = self.readTextFromFile(pageFileName)
                        if nextPage is not None:
                            self.ocrText += nextPage
    if self.ocrText is None:
        self.ocrText = self.getPDFText()
    return self.ocrText

getPDFText()

get my PDF Text

Source code in scan/dms.py
290
291
292
293
294
295
296
297
def getPDFText(self):
    """
    get my PDF Text
    """
    pdfText = None
    if self.fullpath.lower().endswith(".pdf"):
        pdfText = PDFMiner.getPDFText(self.fullpath)
    return pdfText

readTextFromFile(fileName)

read text from the given fileName

Source code in scan/dms.py
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
def readTextFromFile(self, fileName: str) -> str:
    """
    read text from the given fileName
    """
    try:
        with open(fileName, "r") as textFile:
            return textFile.read()
    except UnicodeDecodeError as _ude:
        # print(f"couldn't decode {fileName}")
        with open(fileName, "rb") as file:
            content = file.read()
            suggestion = UnicodeDammit(content)
            encoding = suggestion.original_encoding
            if encoding is None:
                encoding = "utf-8"
            try:
                text = content.decode(encoding)
            except Exception as ex:
                raise (ex)
            return text

uploadFile(wikiId)

call back

Source code in scan/dms.py
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
def uploadFile(self, wikiId):
    """
    call back
    """
    pageContent = self.getContent()
    ignoreExists = True
    wikipush = WikiPush(fromWikiId=None, toWikiId=wikiId, login=True)
    description = f"scanned at {self.timestampStr}"
    msg = f"uploading {self.pageTitle} ({self.fileName}) to {wikiId} ... "
    files = [self.fullpath]
    wikipush.upload(files, force=ignoreExists)
    pageToBeEdited = wikipush.toWiki.getPage(self.pageTitle)
    if (not pageToBeEdited.exists) or ignoreExists:
        pageToBeEdited.edit(pageContent, description)
        wikipush.log(msg + "✅")
        pass

DocumentManager

Bases: EntityManager

manager for Documents

Source code in scan/dms.py
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
class DocumentManager(EntityManager):
    """
    manager for Documents
    """

    def __init__(self, mode="sql", debug=False):
        """constructor"""
        name = "document"
        entityName = "Document"
        entityPluralName = "documents"
        listName = entityPluralName
        clazz = Document
        tableName = name
        config = DMSStorage.getStorageConfig(mode=mode, debug=debug)
        handleInvalidListTypes = True
        filterInvalidListTypes = True
        primaryKey = "url"
        super().__init__(
            name,
            entityName,
            entityPluralName,
            listName,
            clazz,
            tableName,
            primaryKey,
            config,
            handleInvalidListTypes,
            filterInvalidListTypes,
            debug,
        )

    @staticmethod
    def getInstance(mode="sql"):
        dm = DocumentManager(mode=mode)
        DMSStorage.fromCache(dm)
        return dm

__init__(mode='sql', debug=False)

constructor

Source code in scan/dms.py
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
def __init__(self, mode="sql", debug=False):
    """constructor"""
    name = "document"
    entityName = "Document"
    entityPluralName = "documents"
    listName = entityPluralName
    clazz = Document
    tableName = name
    config = DMSStorage.getStorageConfig(mode=mode, debug=debug)
    handleInvalidListTypes = True
    filterInvalidListTypes = True
    primaryKey = "url"
    super().__init__(
        name,
        entityName,
        entityPluralName,
        listName,
        clazz,
        tableName,
        primaryKey,
        config,
        handleInvalidListTypes,
        filterInvalidListTypes,
        debug,
    )

Folder

Bases: JSONAble

a Folder might be a filesystem folder or a category in a wiki

Source code in scan/dms.py
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
class Folder(JSONAble):
    """
    a Folder might be a filesystem folder or a category in a wiki
    """

    def __init__(self):
        """
        Constructor
        """

    @classmethod
    def getSamples(cls):
        samplesLOD = [
            {
                "archiveName": "bitplan-scan",
                "url": "http://capri.bitplan.com/bitplan/scan/2019/",
                "fileCount": 15,
                "lastModified": datetime(2019, 2, 27, 10, 7, 56),
                "created": datetime(2019, 2, 27, 10, 7, 56),
                "name": "2019",
                "path": "/bitplan/scan/2019",
            }
        ]
        return samplesLOD

    @classmethod
    def getPrefix(cls):
        """
        get the path prefix for this platform (if any)

        Return:
            str: the prefix e.g. /Volumes on Darwin
        """
        if sys.platform == "darwin":
            prefix = f"/Volumes"
        else:
            prefix = ""
        return prefix

    @staticmethod
    def getFullpath(folderPath: str, local: bool = False):
        """
        get the full path as accessible on my platform

        Args:
           folderPath(str): the path of the folder
           local(bool): True if the path is for a local folder

        Return:
            str: the full path of the folder
        """
        if local:
            fullPath = folderPath
        else:
            fullPath = f"{Folder.getPrefix()}{folderPath}"
        return fullPath

    @classmethod
    def getRelpath(cls, folderPath: str) -> str:
        """
        get the relative path as accessible on my platform

        Args:
           folderPath(str): the path of the folder

        Return:
            str: the relative path of the folder
        """
        prefix = Folder.getPrefix()
        if prefix and folderPath.startswith(prefix):
            relbase = folderPath.replace(prefix, "")
        else:
            relbase = folderPath
        return relbase

    def getFiles(self, extension=".pdf"):
        """
        get all files with the given extension

        Args:
            extension(str): the extension to search for

        Return:
            list: the files with the given extension
        """
        files = []
        fullPath = Folder.getFullpath(self.path)
        for file in os.listdir(fullPath):
            if file.endswith(extension) and not file.startswith("._"):
                files.append(file)
        return files

    def getFileDocuments(self):
        """
        get all documents for the OCRDocument files in this folder

        Return:
            list: the list of documents
        """
        files = self.getFiles()
        documents = self.getDocuments(files)
        return documents

    def getDocuments(self, files, withOcr=False):
        """
        get the documents for this folder based on the files from my listdir
        """
        documentList = []
        msg = f"getting {len(files)} documents for {self.path}"
        Logger.log(msg)
        for file in files:
            try:
                if file.endswith(".pdf"):
                    doc = Document()
                    doc.archiveName = self.archiveName
                    doc.url = f"http://{self.archive.server}{self.path}/{file}"
                    doc.fromFile(self.path, file, withOcr=withOcr)
                    documentList.append(doc)
            except Exception as e:
                Logger.logException(e)
        return documentList

    def refreshDocuments(self):
        """
        refresh the documents in this folder
        """
        doclist = self.getFileDocuments()
        for doc in doclist:
            doc.getOcrText()
            pass
        pass

__init__()

Constructor

Source code in scan/dms.py
409
410
411
412
def __init__(self):
    """
    Constructor
    """

getDocuments(files, withOcr=False)

get the documents for this folder based on the files from my listdir

Source code in scan/dms.py
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
def getDocuments(self, files, withOcr=False):
    """
    get the documents for this folder based on the files from my listdir
    """
    documentList = []
    msg = f"getting {len(files)} documents for {self.path}"
    Logger.log(msg)
    for file in files:
        try:
            if file.endswith(".pdf"):
                doc = Document()
                doc.archiveName = self.archiveName
                doc.url = f"http://{self.archive.server}{self.path}/{file}"
                doc.fromFile(self.path, file, withOcr=withOcr)
                documentList.append(doc)
        except Exception as e:
            Logger.logException(e)
    return documentList

getFileDocuments()

get all documents for the OCRDocument files in this folder

Return

list: the list of documents

Source code in scan/dms.py
496
497
498
499
500
501
502
503
504
505
def getFileDocuments(self):
    """
    get all documents for the OCRDocument files in this folder

    Return:
        list: the list of documents
    """
    files = self.getFiles()
    documents = self.getDocuments(files)
    return documents

getFiles(extension='.pdf')

get all files with the given extension

Parameters:

Name Type Description Default
extension(str)

the extension to search for

required
Return

list: the files with the given extension

Source code in scan/dms.py
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
def getFiles(self, extension=".pdf"):
    """
    get all files with the given extension

    Args:
        extension(str): the extension to search for

    Return:
        list: the files with the given extension
    """
    files = []
    fullPath = Folder.getFullpath(self.path)
    for file in os.listdir(fullPath):
        if file.endswith(extension) and not file.startswith("._"):
            files.append(file)
    return files

getFullpath(folderPath, local=False) staticmethod

get the full path as accessible on my platform

Parameters:

Name Type Description Default
folderPath(str)

the path of the folder

required
local(bool)

True if the path is for a local folder

required
Return

str: the full path of the folder

Source code in scan/dms.py
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
@staticmethod
def getFullpath(folderPath: str, local: bool = False):
    """
    get the full path as accessible on my platform

    Args:
       folderPath(str): the path of the folder
       local(bool): True if the path is for a local folder

    Return:
        str: the full path of the folder
    """
    if local:
        fullPath = folderPath
    else:
        fullPath = f"{Folder.getPrefix()}{folderPath}"
    return fullPath

getPrefix() classmethod

get the path prefix for this platform (if any)

Return

str: the prefix e.g. /Volumes on Darwin

Source code in scan/dms.py
429
430
431
432
433
434
435
436
437
438
439
440
441
@classmethod
def getPrefix(cls):
    """
    get the path prefix for this platform (if any)

    Return:
        str: the prefix e.g. /Volumes on Darwin
    """
    if sys.platform == "darwin":
        prefix = f"/Volumes"
    else:
        prefix = ""
    return prefix

getRelpath(folderPath) classmethod

get the relative path as accessible on my platform

Parameters:

Name Type Description Default
folderPath(str)

the path of the folder

required
Return

str: the relative path of the folder

Source code in scan/dms.py
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
@classmethod
def getRelpath(cls, folderPath: str) -> str:
    """
    get the relative path as accessible on my platform

    Args:
       folderPath(str): the path of the folder

    Return:
        str: the relative path of the folder
    """
    prefix = Folder.getPrefix()
    if prefix and folderPath.startswith(prefix):
        relbase = folderPath.replace(prefix, "")
    else:
        relbase = folderPath
    return relbase

refreshDocuments()

refresh the documents in this folder

Source code in scan/dms.py
526
527
528
529
530
531
532
533
534
def refreshDocuments(self):
    """
    refresh the documents in this folder
    """
    doclist = self.getFileDocuments()
    for doc in doclist:
        doc.getOcrText()
        pass
    pass

FolderManager

Bases: EntityManager

manager for Archives

Source code in scan/dms.py
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
class FolderManager(EntityManager):
    """
    manager for Archives
    """

    def __init__(self, mode="sql", debug=False):
        """constructor"""
        name = "folder"
        entityName = "Folder"
        entityPluralName = "folders"
        listName = entityPluralName
        clazz = Folder
        tableName = name
        config = DMSStorage.getStorageConfig(mode=mode, debug=debug)
        handleInvalidListTypes = True
        filterInvalidListTypes = True
        primaryKey = None
        super().__init__(
            name,
            entityName,
            entityPluralName,
            listName,
            clazz,
            tableName,
            primaryKey,
            config,
            handleInvalidListTypes,
            filterInvalidListTypes,
            debug,
        )

    @staticmethod
    def getInstance(mode="sql"):
        fm = FolderManager(mode=mode)
        DMSStorage.fromCache(fm)
        return fm

    def getDocumentRecords(self, archiveName, folderPath):
        """
        get the document records
        """
        sqlDB = SQLDB(self.getCacheFile())
        sqlQuery = "SELECT * FROM document WHERE archiveName=(?) AND folderPath=(?)"
        params = (
            archiveName,
            folderPath,
        )
        dictList = sqlDB.query(sqlQuery, params)
        return dictList

    def getFolder(self, archive, folderPath: str):
        """
        get the folder for the given archive and folderPath

        Args:
            archive: the  archive
            folderPath: the path of the folder
        """
        sqlDB = SQLDB(self.getCacheFile())
        sqlQuery = "SELECT * FROM folder WHERE archiveName=(?) AND path=(?)"
        archiveName = archive.name
        params = (
            archiveName,
            folderPath,
        )
        records = sqlDB.query(sqlQuery, params)
        folder = None
        if len(records) > 1:
            msg = f"{len(records)} folders found for {archiveName}:{folderPath} - there should be only one"
            raise Exception(msg)
        elif len(records) == 1:
            folder = Folder()
            folder.fromDict(records[0])
        folder.archive = archive
        return folder

    def refreshFolder(self, archive, folderPath):
        """
        for the given archive and folderPath

        Args:
            archive: the name of the archive
            folderPath: the path of the folder
        """
        folder = self.getFolder(archive, folderPath)
        folder.refreshDocuments()
        pass

__init__(mode='sql', debug=False)

constructor

Source code in scan/dms.py
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
def __init__(self, mode="sql", debug=False):
    """constructor"""
    name = "folder"
    entityName = "Folder"
    entityPluralName = "folders"
    listName = entityPluralName
    clazz = Folder
    tableName = name
    config = DMSStorage.getStorageConfig(mode=mode, debug=debug)
    handleInvalidListTypes = True
    filterInvalidListTypes = True
    primaryKey = None
    super().__init__(
        name,
        entityName,
        entityPluralName,
        listName,
        clazz,
        tableName,
        primaryKey,
        config,
        handleInvalidListTypes,
        filterInvalidListTypes,
        debug,
    )

getDocumentRecords(archiveName, folderPath)

get the document records

Source code in scan/dms.py
612
613
614
615
616
617
618
619
620
621
622
623
def getDocumentRecords(self, archiveName, folderPath):
    """
    get the document records
    """
    sqlDB = SQLDB(self.getCacheFile())
    sqlQuery = "SELECT * FROM document WHERE archiveName=(?) AND folderPath=(?)"
    params = (
        archiveName,
        folderPath,
    )
    dictList = sqlDB.query(sqlQuery, params)
    return dictList

getFolder(archive, folderPath)

get the folder for the given archive and folderPath

Parameters:

Name Type Description Default
archive

the archive

required
folderPath str

the path of the folder

required
Source code in scan/dms.py
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
def getFolder(self, archive, folderPath: str):
    """
    get the folder for the given archive and folderPath

    Args:
        archive: the  archive
        folderPath: the path of the folder
    """
    sqlDB = SQLDB(self.getCacheFile())
    sqlQuery = "SELECT * FROM folder WHERE archiveName=(?) AND path=(?)"
    archiveName = archive.name
    params = (
        archiveName,
        folderPath,
    )
    records = sqlDB.query(sqlQuery, params)
    folder = None
    if len(records) > 1:
        msg = f"{len(records)} folders found for {archiveName}:{folderPath} - there should be only one"
        raise Exception(msg)
    elif len(records) == 1:
        folder = Folder()
        folder.fromDict(records[0])
    folder.archive = archive
    return folder

refreshFolder(archive, folderPath)

for the given archive and folderPath

Parameters:

Name Type Description Default
archive

the name of the archive

required
folderPath

the path of the folder

required
Source code in scan/dms.py
651
652
653
654
655
656
657
658
659
660
661
def refreshFolder(self, archive, folderPath):
    """
    for the given archive and folderPath

    Args:
        archive: the name of the archive
        folderPath: the path of the folder
    """
    folder = self.getFolder(archive, folderPath)
    folder.refreshDocuments()
    pass

Wiki

Bases: object

Semantic Mediawiki access proxy

Source code in scan/dms.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
class Wiki(object):
    """
    Semantic Mediawiki access proxy
    """

    @staticmethod
    def getSMW(wikiId: str):
        """
        get the semantic mediawiki client with the given wikiId

        Args:
            wikiId: the wiki id of the client

        Return:
            SMWClient: the SMWClient with the given id
        """
        wikiClient = Wiki.get(wikiId)
        smw = SMWClient(wikiClient.getSite())
        return smw

    @staticmethod
    def get(wikiId: str):
        """
        get the Wiki Client with the given wikiId

        Args:
            wikiId: the wiki id of the client

        Return:
            WikiClient: the WikiClient with the given id
        """
        Wiki.checkIniFile(wikiId)
        wikiClient = WikiClient.ofWikiId(wikiId)
        wikiClient.login()
        return wikiClient

    @staticmethod
    def inPublicCI():
        """
        are we running in a public Continuous Integration Environment?
        """
        return getpass.getuser() in ["travis", "runner"]

    @staticmethod
    def checkIniFile(wikiId: str, save=None):
        """
        check the ini file for the given wikiId

        Args:
            wikiId(str): the wiki id of the wiki to check
            save(bool): True if a new ini file should be created e.g. for test purposes
                        if not set save is True if we are running in a public continuous integration environment
        """
        if save is None:
            save = Wiki.inPublicCI()
        iniFile = WikiUser.iniFilePath(wikiId)
        if not os.path.isfile(iniFile):
            wikiDict = None
            if wikiId == "wiki":
                wikiDict = {
                    "wikiId": wikiId,
                    "email": "noreply@nouser.com",
                    "url": "https://wiki.bitplan.com",
                    "scriptPath": "/",
                    "version": "MediaWiki 1.35.1",
                }
            if wikiDict is None:
                raise Exception(
                    f"wikiId {wikiId} is not configured in $HOME.mediawiki-japi"
                )
            else:
                wikiUser = WikiUser.ofDict(wikiDict, lenient=True)
                if save:
                    wikiUser.save()
            pass

checkIniFile(wikiId, save=None) staticmethod

check the ini file for the given wikiId

Parameters:

Name Type Description Default
wikiId(str)

the wiki id of the wiki to check

required
save(bool)

True if a new ini file should be created e.g. for test purposes if not set save is True if we are running in a public continuous integration environment

required
Source code in scan/dms.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
@staticmethod
def checkIniFile(wikiId: str, save=None):
    """
    check the ini file for the given wikiId

    Args:
        wikiId(str): the wiki id of the wiki to check
        save(bool): True if a new ini file should be created e.g. for test purposes
                    if not set save is True if we are running in a public continuous integration environment
    """
    if save is None:
        save = Wiki.inPublicCI()
    iniFile = WikiUser.iniFilePath(wikiId)
    if not os.path.isfile(iniFile):
        wikiDict = None
        if wikiId == "wiki":
            wikiDict = {
                "wikiId": wikiId,
                "email": "noreply@nouser.com",
                "url": "https://wiki.bitplan.com",
                "scriptPath": "/",
                "version": "MediaWiki 1.35.1",
            }
        if wikiDict is None:
            raise Exception(
                f"wikiId {wikiId} is not configured in $HOME.mediawiki-japi"
            )
        else:
            wikiUser = WikiUser.ofDict(wikiDict, lenient=True)
            if save:
                wikiUser.save()
        pass

get(wikiId) staticmethod

get the Wiki Client with the given wikiId

Parameters:

Name Type Description Default
wikiId str

the wiki id of the client

required
Return

WikiClient: the WikiClient with the given id

Source code in scan/dms.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@staticmethod
def get(wikiId: str):
    """
    get the Wiki Client with the given wikiId

    Args:
        wikiId: the wiki id of the client

    Return:
        WikiClient: the WikiClient with the given id
    """
    Wiki.checkIniFile(wikiId)
    wikiClient = WikiClient.ofWikiId(wikiId)
    wikiClient.login()
    return wikiClient

getSMW(wikiId) staticmethod

get the semantic mediawiki client with the given wikiId

Parameters:

Name Type Description Default
wikiId str

the wiki id of the client

required
Return

SMWClient: the SMWClient with the given id

Source code in scan/dms.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@staticmethod
def getSMW(wikiId: str):
    """
    get the semantic mediawiki client with the given wikiId

    Args:
        wikiId: the wiki id of the client

    Return:
        SMWClient: the SMWClient with the given id
    """
    wikiClient = Wiki.get(wikiId)
    smw = SMWClient(wikiClient.getSite())
    return smw

inPublicCI() staticmethod

are we running in a public Continuous Integration Environment?

Source code in scan/dms.py
69
70
71
72
73
74
@staticmethod
def inPublicCI():
    """
    are we running in a public Continuous Integration Environment?
    """
    return getpass.getuser() in ["travis", "runner"]

entity_view

Created on 2023-11-17

@author: wf

EntityManagerView

a view for a given entity manager

Source code in scan/entity_view.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
class EntityManagerView:
    """
    a view for a given entity manager
    """

    def __init__(self, em: EntityManager):
        self.em = em
        self.setup_view()

    def setup_view(self):
        """
        set up my view elements
        """
        self.lod_grid = ListOfDictsGrid()

    def linkColumn(self, name, record, formatWith=None, formatTitleWith=None):
        """
        replace the column with the given name with a link
        """
        if name in record:
            value = record[name]
            if value is None:
                record[name] = ""
            else:
                if formatWith is None:
                    lurl = value
                else:
                    lurl = formatWith % value
                if formatTitleWith is None:
                    title = value
                else:
                    title = formatTitleWith % value
                record[name] = Link.create(lurl, title)

    def defaultRowHandler(self, row):
        self.linkColumn("url", row, formatWith="%s")

    def show(self, rowHandler=None, lodKeyHandler=None):
        """
        show my given entity manager
        """
        records = self.em.getList()
        if len(records) > 0:
            firstRecord = records[0]
            lodKeys = list(firstRecord.getJsonTypeSamples()[0].keys())
        else:
            lodKeys = ["url"]
        if lodKeyHandler is not None:
            lodKeyHandler(lodKeys)
        tableHeaders = lodKeys
        dictList = [vars(d).copy() for d in records]
        if rowHandler is None:
            rowHandler = self.defaultRowHandler
        for row in dictList:
            rowHandler(row)
        title = self.em.entityPluralName
        self.lod_grid.load_lod(dictList)

linkColumn(name, record, formatWith=None, formatTitleWith=None)

replace the column with the given name with a link

Source code in scan/entity_view.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def linkColumn(self, name, record, formatWith=None, formatTitleWith=None):
    """
    replace the column with the given name with a link
    """
    if name in record:
        value = record[name]
        if value is None:
            record[name] = ""
        else:
            if formatWith is None:
                lurl = value
            else:
                lurl = formatWith % value
            if formatTitleWith is None:
                title = value
            else:
                title = formatTitleWith % value
            record[name] = Link.create(lurl, title)

setup_view()

set up my view elements

Source code in scan/entity_view.py
29
30
31
32
33
def setup_view(self):
    """
    set up my view elements
    """
    self.lod_grid = ListOfDictsGrid()

show(rowHandler=None, lodKeyHandler=None)

show my given entity manager

Source code in scan/entity_view.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def show(self, rowHandler=None, lodKeyHandler=None):
    """
    show my given entity manager
    """
    records = self.em.getList()
    if len(records) > 0:
        firstRecord = records[0]
        lodKeys = list(firstRecord.getJsonTypeSamples()[0].keys())
    else:
        lodKeys = ["url"]
    if lodKeyHandler is not None:
        lodKeyHandler(lodKeys)
    tableHeaders = lodKeys
    dictList = [vars(d).copy() for d in records]
    if rowHandler is None:
        rowHandler = self.defaultRowHandler
    for row in dictList:
        rowHandler(row)
    title = self.em.entityPluralName
    self.lod_grid.load_lod(dictList)

EntityView

Source code in scan/entity_view.py
12
13
14
15
16
17
class EntityView:
    """ """

    def __init__(self, entity: JSONAble):
        """ """
        self.entity = entity

__init__(entity)

Source code in scan/entity_view.py
15
16
17
def __init__(self, entity: JSONAble):
    """ """
    self.entity = entity

folderwatcher

Created on 2021-04-21

see https://stackoverflow.com/a/66110795/1497139

Handler

Bases: PatternMatchingEventHandler

handle changes for a given wildcard pattern

Source code in scan/folderwatcher.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
class Handler(PatternMatchingEventHandler):
    """
    handle changes for a given wildcard pattern
    """

    def __init__(self, callback, patterns, debug=False):
        """
        construct me

        Args:
            callback: the function to call
            patterns: the patterns to trigger on
            debug(bool): if True print debug output
        """
        self.callback = callback
        self.debug = debug
        # Set the patterns for PatternMatchingEventHandler
        PatternMatchingEventHandler.__init__(
            self,
            patterns=patterns,
            ignore_directories=True,
            case_sensitive=False,
        )

    def on_any_event(self, event):
        if self.debug:
            print(
                "[{}] noticed: [{}] on: [{}] ".format(
                    time.asctime(), event.event_type, event.src_path
                )
            )
        if "modified" == event.event_type:
            self.callback(event.src_path)

__init__(callback, patterns, debug=False)

construct me

Parameters:

Name Type Description Default
callback

the function to call

required
patterns

the patterns to trigger on

required
debug(bool)

if True print debug output

required
Source code in scan/folderwatcher.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def __init__(self, callback, patterns, debug=False):
    """
    construct me

    Args:
        callback: the function to call
        patterns: the patterns to trigger on
        debug(bool): if True print debug output
    """
    self.callback = callback
    self.debug = debug
    # Set the patterns for PatternMatchingEventHandler
    PatternMatchingEventHandler.__init__(
        self,
        patterns=patterns,
        ignore_directories=True,
        case_sensitive=False,
    )

Watcher

watch the given path with the given callback

Source code in scan/folderwatcher.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class Watcher:
    """
    watch the given path with the given callback
    """

    def __init__(self, path, patterns=["*.pdf", "*.jpg"], debug=False):
        """
        construct me for the given path
        Args:
            path(str): the directory to observer
            patterns(list): a list of wildcard patterns
            debug(bool): True if debugging should be switched on
        """
        self.observer = Observer()
        self.path = path
        self.patterns = patterns
        self.debug = debug

    def run(self, callback, sleepTime=1, limit=sys.maxsize):
        """
        run me

        Args:
            callback(func): the function to trigger when a file appears
            sleepTime(float): how often to check for incoming files - default: 1.0 secs
            limit(float): the maximum time to run the server default: unlimited
        """
        event_handler = Handler(callback, patterns=self.patterns, debug=self.debug)
        self.observer.schedule(event_handler, self.path, recursive=True)
        self.observer.start()
        runTime = 0
        try:
            while runTime < limit:
                time.sleep(sleepTime)
                runTime += sleepTime

        except Exception as ex:
            self.observer.stop()
            if self.debug:
                print("Error %s " % str(ex))

__init__(path, patterns=['*.pdf', '*.jpg'], debug=False)

construct me for the given path Args: path(str): the directory to observer patterns(list): a list of wildcard patterns debug(bool): True if debugging should be switched on

Source code in scan/folderwatcher.py
20
21
22
23
24
25
26
27
28
29
30
31
def __init__(self, path, patterns=["*.pdf", "*.jpg"], debug=False):
    """
    construct me for the given path
    Args:
        path(str): the directory to observer
        patterns(list): a list of wildcard patterns
        debug(bool): True if debugging should be switched on
    """
    self.observer = Observer()
    self.path = path
    self.patterns = patterns
    self.debug = debug

run(callback, sleepTime=1, limit=sys.maxsize)

run me

Parameters:

Name Type Description Default
callback(func)

the function to trigger when a file appears

required
sleepTime(float)

how often to check for incoming files - default: 1.0 secs

required
limit(float)

the maximum time to run the server default: unlimited

required
Source code in scan/folderwatcher.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def run(self, callback, sleepTime=1, limit=sys.maxsize):
    """
    run me

    Args:
        callback(func): the function to trigger when a file appears
        sleepTime(float): how often to check for incoming files - default: 1.0 secs
        limit(float): the maximum time to run the server default: unlimited
    """
    event_handler = Handler(callback, patterns=self.patterns, debug=self.debug)
    self.observer.schedule(event_handler, self.path, recursive=True)
    self.observer.start()
    runTime = 0
    try:
        while runTime < limit:
            time.sleep(sleepTime)
            runTime += sleepTime

    except Exception as ex:
        self.observer.stop()
        if self.debug:
            print("Error %s " % str(ex))

logger

Created on 2021-11-02

@author: wf

Logger

Bases: object

a logger module

Source code in scan/logger.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class Logger(object):
    """
    a logger module
    """

    logging.basicConfig(level=logging.DEBUG)
    logger = logging.getLogger(__name__)

    @staticmethod
    def log(msg: str):
        Logger.logger.info(msg)

    @staticmethod
    def logException(ex):
        # msg=f"{ex}"
        # print(msg,file=sys.stderr,flush=True)
        Logger.logger.exception(ex)

pdf

PDFMiner

PDFMiner.six wrapper to get PDF Text

Source code in scan/pdf.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class PDFMiner:
    """
    PDFMiner.six wrapper to get PDF Text
    """

    @classmethod
    def getPDFText(cls, pdfFilenamePath, throwError: bool = True):
        retstr = StringIO()
        parser = PDFParser(open(pdfFilenamePath, "rb"))
        try:
            document = PDFDocument(parser)
        except Exception as e:
            errMsg = f"error {pdfFilenamePath}:{str(e)}"
            print(errMsg)
            if throwError:
                raise e
            return ""
        if document.is_extractable:
            rsrcmgr = PDFResourceManager()
            device = TextConverter(rsrcmgr, retstr, laparams=LAParams())
            interpreter = PDFPageInterpreter(rsrcmgr, device)
            for page in PDFPage.create_pages(document):
                interpreter.process_page(page)
            return retstr.getvalue()
        else:
            print(pdfFilenamePath, "Warning: could not extract text from pdf file.")
            return ""

product

Created on 2023-11-16

@author: wf

Product dataclass

Data class representing a product.

Attributes:

Name Type Description
title str

The title of the product.

image_url str

The URL of the product image.

price str

The price of the product.

asin Optional[str]

The Amazon Standard Identification Number (ASIN) of the product, which is a unique identifier on Amazon's platform.

Source code in scan/product.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@dataclass
class Product:
    """
    Data class representing a product.

    Attributes:
        title (str): The title of the product.
        image_url (str): The URL of the product image.
        price (str): The price of the product.
        asin (Optional[str]): The Amazon Standard Identification Number (ASIN) of the product,
                              which is a unique identifier on Amazon's platform.
    """

    title: str
    image_url: str
    price: str
    asin: Optional[str] = None
    gtin: Optional[str] = None

    @property
    def amazon_url(self) -> str:
        return f"https://www.amazon.com/dp/{self.asin}" if self.asin else None

    def as_html(self, img_size: int = 128) -> str:
        """
        Returns an HTML representation of the product with an image thumbnail and a link to the product page.

        Parameters:
            img_size (int): Size of the image thumbnail.

        Returns:
            str: HTML string representation of the product.
        """
        html = f"<div>"
        html += f'<img src="{self.image_url}" alt="{self.title}" width="{img_size}" height="{img_size}"/>'
        if self.amazon_url:
            html += f' <a href="{self.amazon_url}">{self.title}</a>'
        else:
            html += f" {self.title}"
        if self.gtin:
            html += f"Code: {self.gtin}"
        html += f" - {self.price}"
        html += f"</div>"
        return html

as_html(img_size=128)

Returns an HTML representation of the product with an image thumbnail and a link to the product page.

Parameters:

Name Type Description Default
img_size int

Size of the image thumbnail.

128

Returns:

Name Type Description
str str

HTML string representation of the product.

Source code in scan/product.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def as_html(self, img_size: int = 128) -> str:
    """
    Returns an HTML representation of the product with an image thumbnail and a link to the product page.

    Parameters:
        img_size (int): Size of the image thumbnail.

    Returns:
        str: HTML string representation of the product.
    """
    html = f"<div>"
    html += f'<img src="{self.image_url}" alt="{self.title}" width="{img_size}" height="{img_size}"/>'
    if self.amazon_url:
        html += f' <a href="{self.amazon_url}">{self.title}</a>'
    else:
        html += f" {self.title}"
    if self.gtin:
        html += f"Code: {self.gtin}"
    html += f" - {self.price}"
    html += f"</div>"
    return html

Products

Class to handle/manage product instances and make those persistent.

Attributes:

Name Type Description
store_path str

The file path where products are stored as JSON.

products List[Product]

List of product instances.

products_by_asin Dict[str, Product]

Dictionary mapping ASIN to products.

products_by_gtin Dict[str, Product]

Dictionary mapping gtin to products.

Source code in scan/product.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
class Products:
    """
    Class to handle/manage product instances and make those persistent.

    Attributes:
        store_path (str): The file path where products are stored as JSON.
        products (List[Product]): List of product instances.
        products_by_asin (Dict[str, Product]): Dictionary mapping ASIN to products.
        products_by_gtin (Dict[str, Product]): Dictionary mapping gtin to products.
    """

    def __init__(self, store_path: str = None):
        """
        Initialize the Products instance.

        Args:
            store_path (str, optional): The file path where products are stored as JSON.
                                       Defaults to ~/.scan2wiki/products.json.
        """
        self.store_path = store_path or expanduser("~/.scan2wiki/products.json")
        self.clear()

    def clear(self):
        """
        Clears the current product list and the associated mappings.
        """
        self.products = []
        self.products_by_asin = {}
        self.products_by_gtin = {}

    def add_product(self, product: Product):
        """
        Adds a product to the product list and updates the mappings.
        If a product with the same ASIN already exists, it updates the existing record.

        Args:
            product (Product): The product instance to add.
        """
        # Update product if it already exists in the by_asin list
        if product.asin and product.asin in self.products_by_asin:
            existing_product = self.products_by_asin[product.asin]
            existing_product.title = product.title
            existing_product.image_url = product.image_url
            existing_product.price = product.price
            existing_product.gtin = product.gtin
        else:
            # Add the product to the list and mappings
            self.products.append(product)
            if product.asin:
                self.products_by_asin[product.asin] = product
            if product.gtin:
                self.products_by_gtin[product.gtin] = product

        # Sort the products list by ASIN
        self.products.sort(key=lambda p: p.asin if p.asin else "")

    def delete_product(self, asin: str):
        """
        Delete a product with the given ASIN.

        Args:
            asin (str): The ASIN of the product to delete.
        """
        # Delete the product from the products list
        if asin in self.products.products_by_asin:
            product = self.products.products_by_asin[asin]
            self.products.products.remove(product)
            del self.products.products_by_asin[asin]
            if product.gtin and product.gtin in self.products.products_by_gtin:
                del self.products.products_by_gtin[product.gtin]
            self.products.save_to_json()  # Save the updated product list

    def get_aggrid_lod(self) -> List[Dict[str, str]]:
        """
        Generates a list of dictionaries for ag-Grid representation of the products.

        Returns:
            List[Dict[str, str]]: List of product information formatted for ag-Grid.
        """
        lod = []
        for index, product in enumerate(self.products, start=1):
            product_dict = {
                "#": str(index),
                "Product": product.as_html(),
                "ASIN": Link.create(product.amazon_url, product.asin)
                if product.asin
                else "",
                "Title": product.title,
                "gtin": product.gtin if product.gtin else "",
                "Price": product.price,
            }
            lod.append(product_dict)
        return lod

    def save_to_json(self, filename: str = None):
        """
        Saves the current list of products to a JSON file.

        Args:
            filename (str, optional): The filename where to save the JSON data.
                                      Defaults to the instance's store_path attribute.
        """

        filename = filename or self.store_path
        # Ensure the directory for the store_path exists
        directory = dirname(filename)
        if not exists(directory):
            os.makedirs(directory, exist_ok=True)

        product_data = [product.__dict__ for product in self.products]
        with open(filename, "w") as file:
            json.dump(product_data, file, indent=2)

    def load_from_json(self, filepath: str = None):
        """
        Loads products from a JSON file and updates the current list and mappings.

        Args:
            filepath (str, optional): The filepath from which to load the JSON data.
                                      Defaults to the instance's store_path attribute.
        """
        filename = filepath or self.store_path
        if os.path.exists(filename):
            with open(filename, "r") as file:
                product_records = json.load(file)
            for product_record in product_records:
                self.add_product(Product(**product_record))

__init__(store_path=None)

Initialize the Products instance.

Parameters:

Name Type Description Default
store_path str

The file path where products are stored as JSON. Defaults to ~/.scan2wiki/products.json.

None
Source code in scan/product.py
73
74
75
76
77
78
79
80
81
82
def __init__(self, store_path: str = None):
    """
    Initialize the Products instance.

    Args:
        store_path (str, optional): The file path where products are stored as JSON.
                                   Defaults to ~/.scan2wiki/products.json.
    """
    self.store_path = store_path or expanduser("~/.scan2wiki/products.json")
    self.clear()

add_product(product)

Adds a product to the product list and updates the mappings. If a product with the same ASIN already exists, it updates the existing record.

Parameters:

Name Type Description Default
product Product

The product instance to add.

required
Source code in scan/product.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def add_product(self, product: Product):
    """
    Adds a product to the product list and updates the mappings.
    If a product with the same ASIN already exists, it updates the existing record.

    Args:
        product (Product): The product instance to add.
    """
    # Update product if it already exists in the by_asin list
    if product.asin and product.asin in self.products_by_asin:
        existing_product = self.products_by_asin[product.asin]
        existing_product.title = product.title
        existing_product.image_url = product.image_url
        existing_product.price = product.price
        existing_product.gtin = product.gtin
    else:
        # Add the product to the list and mappings
        self.products.append(product)
        if product.asin:
            self.products_by_asin[product.asin] = product
        if product.gtin:
            self.products_by_gtin[product.gtin] = product

    # Sort the products list by ASIN
    self.products.sort(key=lambda p: p.asin if p.asin else "")

clear()

Clears the current product list and the associated mappings.

Source code in scan/product.py
84
85
86
87
88
89
90
def clear(self):
    """
    Clears the current product list and the associated mappings.
    """
    self.products = []
    self.products_by_asin = {}
    self.products_by_gtin = {}

delete_product(asin)

Delete a product with the given ASIN.

Parameters:

Name Type Description Default
asin str

The ASIN of the product to delete.

required
Source code in scan/product.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def delete_product(self, asin: str):
    """
    Delete a product with the given ASIN.

    Args:
        asin (str): The ASIN of the product to delete.
    """
    # Delete the product from the products list
    if asin in self.products.products_by_asin:
        product = self.products.products_by_asin[asin]
        self.products.products.remove(product)
        del self.products.products_by_asin[asin]
        if product.gtin and product.gtin in self.products.products_by_gtin:
            del self.products.products_by_gtin[product.gtin]
        self.products.save_to_json()  # Save the updated product list

get_aggrid_lod()

Generates a list of dictionaries for ag-Grid representation of the products.

Returns:

Type Description
List[Dict[str, str]]

List[Dict[str, str]]: List of product information formatted for ag-Grid.

Source code in scan/product.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def get_aggrid_lod(self) -> List[Dict[str, str]]:
    """
    Generates a list of dictionaries for ag-Grid representation of the products.

    Returns:
        List[Dict[str, str]]: List of product information formatted for ag-Grid.
    """
    lod = []
    for index, product in enumerate(self.products, start=1):
        product_dict = {
            "#": str(index),
            "Product": product.as_html(),
            "ASIN": Link.create(product.amazon_url, product.asin)
            if product.asin
            else "",
            "Title": product.title,
            "gtin": product.gtin if product.gtin else "",
            "Price": product.price,
        }
        lod.append(product_dict)
    return lod

load_from_json(filepath=None)

Loads products from a JSON file and updates the current list and mappings.

Parameters:

Name Type Description Default
filepath str

The filepath from which to load the JSON data. Defaults to the instance's store_path attribute.

None
Source code in scan/product.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def load_from_json(self, filepath: str = None):
    """
    Loads products from a JSON file and updates the current list and mappings.

    Args:
        filepath (str, optional): The filepath from which to load the JSON data.
                                  Defaults to the instance's store_path attribute.
    """
    filename = filepath or self.store_path
    if os.path.exists(filename):
        with open(filename, "r") as file:
            product_records = json.load(file)
        for product_record in product_records:
            self.add_product(Product(**product_record))

save_to_json(filename=None)

Saves the current list of products to a JSON file.

Parameters:

Name Type Description Default
filename str

The filename where to save the JSON data. Defaults to the instance's store_path attribute.

None
Source code in scan/product.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
def save_to_json(self, filename: str = None):
    """
    Saves the current list of products to a JSON file.

    Args:
        filename (str, optional): The filename where to save the JSON data.
                                  Defaults to the instance's store_path attribute.
    """

    filename = filename or self.store_path
    # Ensure the directory for the store_path exists
    directory = dirname(filename)
    if not exists(directory):
        os.makedirs(directory, exist_ok=True)

    product_data = [product.__dict__ for product in self.products]
    with open(filename, "w") as file:
        json.dump(product_data, file, indent=2)

profiler

Created on 2021-10-26

@author: wf

Profiler

simple profiler

Source code in scan/profiler.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class Profiler:
    """
    simple profiler
    """

    def __init__(self, msg:str, profile=True):
        """
        construct me with the given msg and profile active flag

        Args:
            msg (str): the message to show if profiling is active
            profile (bool): True if messages should be shown
        """
        self.msg = msg
        self.profile = profile

    def start(self) -> str:
        """
        start profiling

        Return:
            str: start message
        """
        msg = f"Starting {self.msg} ..."
        self.starttime = time.time()
        if self.profile:
            print(msg)
        return msg

    def time(self, extraMsg=""):
        """
        time the action and print if profile is active

        Return:
            (float,str): time and message for time
        """
        elapsed = time.time() - self.starttime
        elapsedMessage = f"{self.msg}{extraMsg} took {elapsed:5.3f} s"
        if self.profile:
            print(elapsedMessage)
        return elapsed, elapsedMessage

__init__(msg, profile=True)

construct me with the given msg and profile active flag

Parameters:

Name Type Description Default
msg str

the message to show if profiling is active

required
profile bool

True if messages should be shown

True
Source code in scan/profiler.py
14
15
16
17
18
19
20
21
22
23
def __init__(self, msg:str, profile=True):
    """
    construct me with the given msg and profile active flag

    Args:
        msg (str): the message to show if profiling is active
        profile (bool): True if messages should be shown
    """
    self.msg = msg
    self.profile = profile

start()

start profiling

Return

str: start message

Source code in scan/profiler.py
25
26
27
28
29
30
31
32
33
34
35
36
def start(self) -> str:
    """
    start profiling

    Return:
        str: start message
    """
    msg = f"Starting {self.msg} ..."
    self.starttime = time.time()
    if self.profile:
        print(msg)
    return msg

time(extraMsg='')

time the action and print if profile is active

Return

(float,str): time and message for time

Source code in scan/profiler.py
38
39
40
41
42
43
44
45
46
47
48
49
def time(self, extraMsg=""):
    """
    time the action and print if profile is active

    Return:
        (float,str): time and message for time
    """
    elapsed = time.time() - self.starttime
    elapsedMessage = f"{self.msg}{extraMsg} took {elapsed:5.3f} s"
    if self.profile:
        print(elapsedMessage)
    return elapsed, elapsedMessage

scan_cmd

Created on 2023-11-14

@author: wf

ScanCmd

Bases: WebserverCmd

Command line for scan2wiki web server

Source code in scan/scan_cmd.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class ScanCmd(WebserverCmd):
    """
    Command line for scan2wiki web server
    """

    def getArgParser(self, description: str, version_msg) -> ArgumentParser:
        """
        override the default argparser call
        """
        parser = super().getArgParser(description, version_msg)
        parser.add_argument(
            "-v",
            "--verbose",
            action="store_true",
            help="show verbose output [default: %(default)s]",
        )
        parser.add_argument(
            "-rp",
            "--root_path",
            default=ScanSolution.examples_path(),
            help="path to example pdf files [default: %(default)s]",
        )
        parser.add_argument(
            "-wc", "--webcam", help="url of webcam for scans [default: %(default)s]"
        )
        return parser

getArgParser(description, version_msg)

override the default argparser call

Source code in scan/scan_cmd.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def getArgParser(self, description: str, version_msg) -> ArgumentParser:
    """
    override the default argparser call
    """
    parser = super().getArgParser(description, version_msg)
    parser.add_argument(
        "-v",
        "--verbose",
        action="store_true",
        help="show verbose output [default: %(default)s]",
    )
    parser.add_argument(
        "-rp",
        "--root_path",
        default=ScanSolution.examples_path(),
        help="path to example pdf files [default: %(default)s]",
    )
    parser.add_argument(
        "-wc", "--webcam", help="url of webcam for scans [default: %(default)s]"
    )
    return parser

main(argv=None)

main call

Source code in scan/scan_cmd.py
42
43
44
45
46
47
48
def main(argv: list = None):
    """
    main call
    """
    cmd = ScanCmd(config=ScanWebServer.get_config(), webserver_cls=ScanWebServer)
    exit_code = cmd.cmd_main(argv)
    return exit_code

scan_webserver

Created on 2023-11-14

@author: wf

ScanSolution

Bases: InputWebSolution

the Scan solution

Source code in scan/scan_webserver.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
class ScanSolution(InputWebSolution):
    """
    the Scan solution
    """

    def __init__(self, webserver: ScanWebServer, client: Client):
        """
        Initialize the solution

        Calls the constructor of the base solution
        Args:
            webserver (ScanWebServer): The webserver instance associated with this context.
            client (Client): The client instance this context is associated with.
        """
        super().__init__(webserver, client)  # Call to the superclass constructor
        self.stdout_handler = logging.StreamHandler(stream=sys.stdout)
        self.stderr_handler = logging.StreamHandler(stream=sys.stderr)

    async def setup_footer(self):
        """
        add handlers for stdout and stderr
        """
        await super().setup_footer(
            with_log=True,
            handle_logging=False,
            max_lines=100,
            log_classes="w-full h-20",
        )

    async def webcam(self):
        def setup_webcam():
            self.webcam_form = WebcamForm(self, self.args.webcam)

        await self.setup_content_div(setup_webcam)

    async def upload(self, path: str = None):
        """
        handle upload requests
        """

        def setup_upload_form():
            if path:
                ui.notify(f"upload of {path} requested")
            self.upload_form = UploadForm(self, self.webserver.wiki_users, path)

        await self.setup_content_div(setup_upload_form)

    @classmethod
    def examples_path(cls) -> str:
        # the root directory (default: examples)
        path = os.path.join(os.path.dirname(__file__), "../scan2wiki_examples")
        path = os.path.abspath(path)
        return path

    def update_scans(self):
        """
        update the scans grid
        """
        try:
            lod = self.webserver.scans.get_scan_files()
            self.lod_grid.load_lod(lod)
            self.lod_grid.sizeColumnsToFit()
        except Exception as ex:
            self.handle_exception(ex)

    async def show_archives(self):
        """
        show archives
        """

        def setup_show_archives():
            """
            show the archives
            """
            am_view = EntityManagerView(self.webserver.am)
            am_view.show()

        await self.setup_content_div(setup_show_archives)

    def configure_menu(self):
        """
        configure additional non-standard menu entries
        """
        self.link_button(name="Webcam", icon_name="photo_camera", target="/webcam")
        self.link_button(name="Archives", icon_name="database", target="/archives")
        pass

    async def home(self):
        """
        provide the main content page
        """

        def setup_home():
            self.lod_grid = ListOfDictsGrid()
            self.update_scans()

        await (self.setup_content_div(setup_home))

__init__(webserver, client)

Initialize the solution

Calls the constructor of the base solution Args: webserver (ScanWebServer): The webserver instance associated with this context. client (Client): The client instance this context is associated with.

Source code in scan/scan_webserver.py
115
116
117
118
119
120
121
122
123
124
125
126
def __init__(self, webserver: ScanWebServer, client: Client):
    """
    Initialize the solution

    Calls the constructor of the base solution
    Args:
        webserver (ScanWebServer): The webserver instance associated with this context.
        client (Client): The client instance this context is associated with.
    """
    super().__init__(webserver, client)  # Call to the superclass constructor
    self.stdout_handler = logging.StreamHandler(stream=sys.stdout)
    self.stderr_handler = logging.StreamHandler(stream=sys.stderr)

configure_menu()

configure additional non-standard menu entries

Source code in scan/scan_webserver.py
189
190
191
192
193
194
195
def configure_menu(self):
    """
    configure additional non-standard menu entries
    """
    self.link_button(name="Webcam", icon_name="photo_camera", target="/webcam")
    self.link_button(name="Archives", icon_name="database", target="/archives")
    pass

home() async

provide the main content page

Source code in scan/scan_webserver.py
197
198
199
200
201
202
203
204
205
206
async def home(self):
    """
    provide the main content page
    """

    def setup_home():
        self.lod_grid = ListOfDictsGrid()
        self.update_scans()

    await (self.setup_content_div(setup_home))

add handlers for stdout and stderr

Source code in scan/scan_webserver.py
128
129
130
131
132
133
134
135
136
137
async def setup_footer(self):
    """
    add handlers for stdout and stderr
    """
    await super().setup_footer(
        with_log=True,
        handle_logging=False,
        max_lines=100,
        log_classes="w-full h-20",
    )

show_archives() async

show archives

Source code in scan/scan_webserver.py
175
176
177
178
179
180
181
182
183
184
185
186
187
async def show_archives(self):
    """
    show archives
    """

    def setup_show_archives():
        """
        show the archives
        """
        am_view = EntityManagerView(self.webserver.am)
        am_view.show()

    await self.setup_content_div(setup_show_archives)

update_scans()

update the scans grid

Source code in scan/scan_webserver.py
164
165
166
167
168
169
170
171
172
173
def update_scans(self):
    """
    update the scans grid
    """
    try:
        lod = self.webserver.scans.get_scan_files()
        self.lod_grid.load_lod(lod)
        self.lod_grid.sizeColumnsToFit()
    except Exception as ex:
        self.handle_exception(ex)

upload(path=None) async

handle upload requests

Source code in scan/scan_webserver.py
145
146
147
148
149
150
151
152
153
154
155
async def upload(self, path: str = None):
    """
    handle upload requests
    """

    def setup_upload_form():
        if path:
            ui.notify(f"upload of {path} requested")
        self.upload_form = UploadForm(self, self.webserver.wiki_users, path)

    await self.setup_content_div(setup_upload_form)

ScanWebServer

Bases: InputWebserver

server for Document Management system with option to scan to Semantic Mediawikis

Source code in scan/scan_webserver.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
class ScanWebServer(InputWebserver):
    """
    server for Document Management system with option to scan to Semantic Mediawikis
    """

    @classmethod
    def get_config(cls) -> WebserverConfig:
        """
        get the configuration for this Webserver
        """
        copy_right = "(c)2020-2024 Wolfgang Fahl"
        config = WebserverConfig(
            copy_right=copy_right,
            version=Version(),
            default_port=8334,
            short_name="scan2wiki",
            timeout=10.0
        )
        server_config = WebserverConfig.get(config)
        server_config.solution_class = ScanSolution
        return server_config

    def __init__(self):
        """Constructs all the necessary attributes for the WebServer object."""
        InputWebserver.__init__(self, config=ScanWebServer.get_config())
        self.scandir = DMSStorage.getScanDir()
        self.scans = Scans(self.scandir)
        self.wiki_users = WikiUser.getWikiUsers()
        self.sql_db = DMSStorage.getSqlDB()
        self.am = ArchiveManager.getInstance()
        self.fm = FolderManager.getInstance()
        self.dm = DocumentManager.getInstance()
        self.archivesByName, _dup = self.am.getLookup("name")

        @ui.page("/upload/{path:path}")
        async def upload(client: Client, path: str = None):
            return await self.page(
                client, ScanSolution.upload,path
            )

        @ui.page("/webcam")
        async def webcam(client: Client):
            return await self.page(
                client, ScanSolution.webcam
            )

        @ui.page("/archives")
        async def show_archives(client: Client):
            return await self.page(
                client, ScanSolution.show_archives
            )

        @app.get("/delete/{path:path}")
        def delete(path: str = None):
            self.scans.delete(path)
            return RedirectResponse("/")

        @app.route("/files")
        @app.get("/files/{path:path}")
        def files(path: str = "."):
            return self.files(path)

    def files(self, path: str = "."):
        """
        show the files in the given path

        Args:
            path (str): the path to render
        """
        fullpath = f"{self.scandir}/{path}"
        if os.path.isdir(fullpath):
            self.scans = Scans(fullpath)
            return RedirectResponse("/")
        elif os.path.isfile(fullpath):
            file_response = FileResponse(fullpath)
            return file_response
        else:
            msg = f"invalid path: {path}"
            return HTMLResponse(content=msg, status_code=404)

__init__()

Constructs all the necessary attributes for the WebServer object.

Source code in scan/scan_webserver.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def __init__(self):
    """Constructs all the necessary attributes for the WebServer object."""
    InputWebserver.__init__(self, config=ScanWebServer.get_config())
    self.scandir = DMSStorage.getScanDir()
    self.scans = Scans(self.scandir)
    self.wiki_users = WikiUser.getWikiUsers()
    self.sql_db = DMSStorage.getSqlDB()
    self.am = ArchiveManager.getInstance()
    self.fm = FolderManager.getInstance()
    self.dm = DocumentManager.getInstance()
    self.archivesByName, _dup = self.am.getLookup("name")

    @ui.page("/upload/{path:path}")
    async def upload(client: Client, path: str = None):
        return await self.page(
            client, ScanSolution.upload,path
        )

    @ui.page("/webcam")
    async def webcam(client: Client):
        return await self.page(
            client, ScanSolution.webcam
        )

    @ui.page("/archives")
    async def show_archives(client: Client):
        return await self.page(
            client, ScanSolution.show_archives
        )

    @app.get("/delete/{path:path}")
    def delete(path: str = None):
        self.scans.delete(path)
        return RedirectResponse("/")

    @app.route("/files")
    @app.get("/files/{path:path}")
    def files(path: str = "."):
        return self.files(path)

files(path='.')

show the files in the given path

Parameters:

Name Type Description Default
path str

the path to render

'.'
Source code in scan/scan_webserver.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def files(self, path: str = "."):
    """
    show the files in the given path

    Args:
        path (str): the path to render
    """
    fullpath = f"{self.scandir}/{path}"
    if os.path.isdir(fullpath):
        self.scans = Scans(fullpath)
        return RedirectResponse("/")
    elif os.path.isfile(fullpath):
        file_response = FileResponse(fullpath)
        return file_response
    else:
        msg = f"invalid path: {path}"
        return HTMLResponse(content=msg, status_code=404)

get_config() classmethod

get the configuration for this Webserver

Source code in scan/scan_webserver.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@classmethod
def get_config(cls) -> WebserverConfig:
    """
    get the configuration for this Webserver
    """
    copy_right = "(c)2020-2024 Wolfgang Fahl"
    config = WebserverConfig(
        copy_right=copy_right,
        version=Version(),
        default_port=8334,
        short_name="scan2wiki",
        timeout=10.0
    )
    server_config = WebserverConfig.get(config)
    server_config.solution_class = ScanSolution
    return server_config

scans

Created on 2023-11-14

@author: wf

Scans

Class to handle operations related to scanned files.

Source code in scan/scans.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
class Scans:
    """
    Class to handle operations related to scanned files.
    """

    def __init__(self, scandir: str):
        """
        Initialize the Scans object.

        Args:
            scandir (str): The directory where the scanned files are located.
        """
        self.scandir = scandir

    def get_full_path(self, path: str) -> str:
        """
        Generate the full path for a given relative path.

        Args:
            path (str): The relative path to be resolved.

        Returns:
            str: The full path combining the scandir and the provided relative path.
        """
        fullpath = os.path.join(self.scandir, path)
        return fullpath

    def get_file_link(self, path: str) -> str:
        """
        get a link to the given file

        Args:
            path (str) the path to the file

        Returns:
            str: The html markup for the RESTFul API to show the file
        """
        url = f"/files/{path}"
        link = Link.create(url, text=path)
        return url, link

    def get_scan_files(self) -> List[Dict[str, object]]:
        """
        Retrieve the scanned files information from the directory.

        Returns:
            List[Dict[str, object]]: A list of dictionaries, each representing a file.
            Each dictionary contains details like file name, last modified time, size, and links for delete and upload actions.
        """
        scan_files = []
        for index, path in enumerate(os.listdir(self.scandir)):
            try:
                fullpath = self.get_full_path(path)
                ftime = datetime.fromtimestamp(os.path.getmtime(fullpath))
                ftimestr = ftime.strftime("%Y-%m-%d %H:%M:%S")
                size = os.path.getsize(fullpath)
                _file_url, file_link = self.get_file_link(path)
                scan_file = {
                    "#": index + 1,
                    "name": file_link,
                    "lastModified": ftimestr,
                    "size": size,
                    "delete": Link.create(url=f"/delete/{path}", text="❌"),
                    "upload": Link.create(url=f"/upload/{path}", text="⇧"),
                }
                scan_files.append(scan_file)
            except Exception as ex:
                msg = f"error {str(ex)} for {path}"
                raise Exception(msg)
        scan_files = sorted(scan_files, key=lambda x: x['lastModified'],reverse=True)
        for index,scan_file in enumerate(scan_files):
            scan_file["#"]=index+1
        return scan_files

    def delete(self, path:str):
        """
        Args:
            path (str): the file to delete
        """
        fullpath = self.get_full_path(path)
        os.remove(fullpath)

__init__(scandir)

Initialize the Scans object.

Parameters:

Name Type Description Default
scandir str

The directory where the scanned files are located.

required
Source code in scan/scans.py
19
20
21
22
23
24
25
26
def __init__(self, scandir: str):
    """
    Initialize the Scans object.

    Args:
        scandir (str): The directory where the scanned files are located.
    """
    self.scandir = scandir

delete(path)

Parameters:

Name Type Description Default
path str

the file to delete

required
Source code in scan/scans.py
88
89
90
91
92
93
94
def delete(self, path:str):
    """
    Args:
        path (str): the file to delete
    """
    fullpath = self.get_full_path(path)
    os.remove(fullpath)

get a link to the given file

Returns:

Name Type Description
str str

The html markup for the RESTFul API to show the file

Source code in scan/scans.py
41
42
43
44
45
46
47
48
49
50
51
52
53
def get_file_link(self, path: str) -> str:
    """
    get a link to the given file

    Args:
        path (str) the path to the file

    Returns:
        str: The html markup for the RESTFul API to show the file
    """
    url = f"/files/{path}"
    link = Link.create(url, text=path)
    return url, link

get_full_path(path)

Generate the full path for a given relative path.

Parameters:

Name Type Description Default
path str

The relative path to be resolved.

required

Returns:

Name Type Description
str str

The full path combining the scandir and the provided relative path.

Source code in scan/scans.py
28
29
30
31
32
33
34
35
36
37
38
39
def get_full_path(self, path: str) -> str:
    """
    Generate the full path for a given relative path.

    Args:
        path (str): The relative path to be resolved.

    Returns:
        str: The full path combining the scandir and the provided relative path.
    """
    fullpath = os.path.join(self.scandir, path)
    return fullpath

get_scan_files()

Retrieve the scanned files information from the directory.

Returns:

Type Description
List[Dict[str, object]]

List[Dict[str, object]]: A list of dictionaries, each representing a file.

List[Dict[str, object]]

Each dictionary contains details like file name, last modified time, size, and links for delete and upload actions.

Source code in scan/scans.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def get_scan_files(self) -> List[Dict[str, object]]:
    """
    Retrieve the scanned files information from the directory.

    Returns:
        List[Dict[str, object]]: A list of dictionaries, each representing a file.
        Each dictionary contains details like file name, last modified time, size, and links for delete and upload actions.
    """
    scan_files = []
    for index, path in enumerate(os.listdir(self.scandir)):
        try:
            fullpath = self.get_full_path(path)
            ftime = datetime.fromtimestamp(os.path.getmtime(fullpath))
            ftimestr = ftime.strftime("%Y-%m-%d %H:%M:%S")
            size = os.path.getsize(fullpath)
            _file_url, file_link = self.get_file_link(path)
            scan_file = {
                "#": index + 1,
                "name": file_link,
                "lastModified": ftimestr,
                "size": size,
                "delete": Link.create(url=f"/delete/{path}", text="❌"),
                "upload": Link.create(url=f"/upload/{path}", text="⇧"),
            }
            scan_files.append(scan_file)
        except Exception as ex:
            msg = f"error {str(ex)} for {path}"
            raise Exception(msg)
    scan_files = sorted(scan_files, key=lambda x: x['lastModified'],reverse=True)
    for index,scan_file in enumerate(scan_files):
        scan_file["#"]=index+1
    return scan_files

upload

Created on 2023-11-14

@author: wf

UploadForm

upload form

Source code in scan/upload.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
class UploadForm:
    """
    upload form
    """

    def __init__(self, solution, wiki_users: dict, path: str):
        """
        constructor
        """
        self.solution=solution
        self.webserver = solution.webserver
        self.rem_value = 48  # Default rem value
        self.red_link = "color: red;text-decoration: underline;"
        self.blue_link = "color: blue;text-decoration: underline;"
        self.debug = self.webserver.debug
        self.scandir = self.webserver.scandir
        self.scans = self.webserver.scans
        self.wiki_users = wiki_users
        self.path = path
        self.doc = Document()
        self.doc.fromFile(folderPath=self.scandir, file=path, local=True, withOcr=False)
        self.setup_form()
        self.upload_log_filter = UploadLogFilter(self.progressbar)
        self.webserver.logger.addHandler(self.solution.stdout_handler)
        self.webserver.logger.addHandler(self.solution.stderr_handler)
        for logger_name in logging.Logger.manager.loggerDict:
            # print(logger_name)
            logger = logging.getLogger(logger_name)
            logger.propagate = True
            logger.addFilter(self.upload_log_filter)

        self.uploaded = False
        # self.pdfminer_logger = logging.getLogger('pdfminer')
        # self.webserver.logger.addHandler(self.pdfminer_logger)

    def setup_form(self):
        """
        setup the upload form
        """
        with ui.splitter(value=30).classes("h-fit").style("flex:1") as self.splitter:
            with self.splitter.before:
                self.progressbar = NiceguiProgressbar(
                    100, "processing page", "steps"
                )
                with ui.card().tight():
                    with ui.card_section():
                        self.submit = ui.button("upload", on_click=self.run_upload)
                        self.ocr = ui.button("ocr", on_click=self.run_ocr)
                    with ui.card_section():
                        self.page_title = (
                            ui.input("pagetitle", on_change=self.update)
                            .props("size=80")
                            .bind_value_to(self.doc, "pageTitle")
                        )
                        self.page_link = ui.html("pagelink").style(self.red_link)
                        wiki_selection = list(sorted(self.wiki_users.keys()))
                        self.wiki_user_select = self.solution.add_select(
                            title="Wiki",
                            selection=wiki_selection,
                            on_change=self.update,
                        )
                        (
                            self.scanned_file_url,
                            self.scanned_file_link,
                        ) = self.scans.get_file_link(self.path)
                        self.scanned_file_link_view = ui.html(
                            self.scanned_file_link
                        ).style(self.blue_link)
                        current_date = datetime.now()
                        self.categories = ui.input(
                            "categories", value=str(current_date.year)
                        ).bind_value_to(self.doc, "categories")
                        self.topic = ui.input(
                            "topic", value="OCRDocument"
                        ).bind_value_to(self.doc, "topic")
            with self.splitter.after as self.pdf_container:
                with ui.element("div").classes("w-full h-full"):
                    self.ocr_text_area = (
                        ui.textarea("Text")
                        .props("clearable")
                        .props("rows=25;cols=80")
                        .bind_value_to(self.doc, "ocrText")
                    )
                    ui.separator()
                    self.rem_slider = ui.slider(
                        min=10,
                        max=100,
                        step=1,
                        value=self.rem_value,
                        on_change=self.update_pdf_viewer_height,
                    )
                    # Embedding the PDF within a div that takes the full width and height
                    pdf_html = f"""<embed src="{self.scanned_file_url}" type="application/pdf" style="width:100%; height:100%;">"""
                    self.pdf_viewer = ui.html(pdf_html).classes("w-full h-[48rem]")

    async def run_ocr(self):
        """
        run the optical character recognition
        """
        try:
            self.upload_log_filter.reset(1, 150)
            time_msg = TimeMessage(f"OCR for {self.doc.name} ({self.doc.size})")
            ui.notify(time_msg)
            ocr_text = await run.io_bound(self.doc.getOcrText)
            self.ocr_text_area.value = ocr_text
            self.upload_log_filter.show_stats(self.solution.log_view)
            ui.notify(time_msg.done())
            self.update_progress(100)
        except Exception as ex:
            self.webserver.handle_exception(ex)

    async def update_pdf_viewer_height(self, e):
        """
        Update the height of the PDF viewer based on the slider value.
        """
        self.rem_value = e.value
        new_height = f"h-[{self.rem_value}rem]"  # Calculate the new height in rem
        self.pdf_viewer.classes = f"w-full {new_height}"  # Update the PDF viewer height
        self.splitter.update()

    def update_progress(self, progress):
        self.progressbar.value = progress

    def update(self):
        """
        update the page_link dependend on the page text or selected wiki
        """
        page_title = self.page_title.value
        wiki_id = self.wiki_user_select.value
        if wiki_id in self.wiki_users:
            wiki_user = self.wiki_users[wiki_id]
            wiki_url = f"{wiki_user.url}{wiki_user.scriptPath}"
            wiki_link = Link.create(f"{wiki_url}/index.php/{page_title}", page_title)
            self.page_link.content = wiki_link
            link_style = self.blue_link if self.uploaded else self.red_link
            self.page_link.style(link_style)

    def to_document(self, scandir, withOcr: bool = False):
        """
        convert my content to a document
        """
        doc = Document()
        doc.fromFile(scandir, self.scanned_file.value, local=True, withOcr=withOcr)
        doc.wikiUser = self.wiki_user_select.value
        doc.categories = self.categories.value
        if not withOcr:
            doc.ocrText = self.ocr_text_area.value
        return doc

    async def run_upload(self):
        """
        actually do the upload
        """
        try:
            uploadDoc = self.doc
            self.upload_log_filter.reset(8, 1)
            time_msg = TimeMessage(f"uploading {uploadDoc.name} ({uploadDoc.size})")
            ui.notify(time_msg)
            wiki_id = self.wiki_user_select.value
            await run.io_bound(uploadDoc.uploadFile, wiki_id)
            self.upload_log_filter.show_stats(self.solution.log_view)
            ui.notify(time_msg.done())
            # self.update_progress(100)
            self.uploaded = True
            self.update()
        except Exception as ex:
            self.solution.handle_exception(ex)

__init__(solution, wiki_users, path)

constructor

Source code in scan/upload.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def __init__(self, solution, wiki_users: dict, path: str):
    """
    constructor
    """
    self.solution=solution
    self.webserver = solution.webserver
    self.rem_value = 48  # Default rem value
    self.red_link = "color: red;text-decoration: underline;"
    self.blue_link = "color: blue;text-decoration: underline;"
    self.debug = self.webserver.debug
    self.scandir = self.webserver.scandir
    self.scans = self.webserver.scans
    self.wiki_users = wiki_users
    self.path = path
    self.doc = Document()
    self.doc.fromFile(folderPath=self.scandir, file=path, local=True, withOcr=False)
    self.setup_form()
    self.upload_log_filter = UploadLogFilter(self.progressbar)
    self.webserver.logger.addHandler(self.solution.stdout_handler)
    self.webserver.logger.addHandler(self.solution.stderr_handler)
    for logger_name in logging.Logger.manager.loggerDict:
        # print(logger_name)
        logger = logging.getLogger(logger_name)
        logger.propagate = True
        logger.addFilter(self.upload_log_filter)

    self.uploaded = False

run_ocr() async

run the optical character recognition

Source code in scan/upload.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
async def run_ocr(self):
    """
    run the optical character recognition
    """
    try:
        self.upload_log_filter.reset(1, 150)
        time_msg = TimeMessage(f"OCR for {self.doc.name} ({self.doc.size})")
        ui.notify(time_msg)
        ocr_text = await run.io_bound(self.doc.getOcrText)
        self.ocr_text_area.value = ocr_text
        self.upload_log_filter.show_stats(self.solution.log_view)
        ui.notify(time_msg.done())
        self.update_progress(100)
    except Exception as ex:
        self.webserver.handle_exception(ex)

run_upload() async

actually do the upload

Source code in scan/upload.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
async def run_upload(self):
    """
    actually do the upload
    """
    try:
        uploadDoc = self.doc
        self.upload_log_filter.reset(8, 1)
        time_msg = TimeMessage(f"uploading {uploadDoc.name} ({uploadDoc.size})")
        ui.notify(time_msg)
        wiki_id = self.wiki_user_select.value
        await run.io_bound(uploadDoc.uploadFile, wiki_id)
        self.upload_log_filter.show_stats(self.solution.log_view)
        ui.notify(time_msg.done())
        # self.update_progress(100)
        self.uploaded = True
        self.update()
    except Exception as ex:
        self.solution.handle_exception(ex)

setup_form()

setup the upload form

Source code in scan/upload.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def setup_form(self):
    """
    setup the upload form
    """
    with ui.splitter(value=30).classes("h-fit").style("flex:1") as self.splitter:
        with self.splitter.before:
            self.progressbar = NiceguiProgressbar(
                100, "processing page", "steps"
            )
            with ui.card().tight():
                with ui.card_section():
                    self.submit = ui.button("upload", on_click=self.run_upload)
                    self.ocr = ui.button("ocr", on_click=self.run_ocr)
                with ui.card_section():
                    self.page_title = (
                        ui.input("pagetitle", on_change=self.update)
                        .props("size=80")
                        .bind_value_to(self.doc, "pageTitle")
                    )
                    self.page_link = ui.html("pagelink").style(self.red_link)
                    wiki_selection = list(sorted(self.wiki_users.keys()))
                    self.wiki_user_select = self.solution.add_select(
                        title="Wiki",
                        selection=wiki_selection,
                        on_change=self.update,
                    )
                    (
                        self.scanned_file_url,
                        self.scanned_file_link,
                    ) = self.scans.get_file_link(self.path)
                    self.scanned_file_link_view = ui.html(
                        self.scanned_file_link
                    ).style(self.blue_link)
                    current_date = datetime.now()
                    self.categories = ui.input(
                        "categories", value=str(current_date.year)
                    ).bind_value_to(self.doc, "categories")
                    self.topic = ui.input(
                        "topic", value="OCRDocument"
                    ).bind_value_to(self.doc, "topic")
        with self.splitter.after as self.pdf_container:
            with ui.element("div").classes("w-full h-full"):
                self.ocr_text_area = (
                    ui.textarea("Text")
                    .props("clearable")
                    .props("rows=25;cols=80")
                    .bind_value_to(self.doc, "ocrText")
                )
                ui.separator()
                self.rem_slider = ui.slider(
                    min=10,
                    max=100,
                    step=1,
                    value=self.rem_value,
                    on_change=self.update_pdf_viewer_height,
                )
                # Embedding the PDF within a div that takes the full width and height
                pdf_html = f"""<embed src="{self.scanned_file_url}" type="application/pdf" style="width:100%; height:100%;">"""
                self.pdf_viewer = ui.html(pdf_html).classes("w-full h-[48rem]")

to_document(scandir, withOcr=False)

convert my content to a document

Source code in scan/upload.py
210
211
212
213
214
215
216
217
218
219
220
def to_document(self, scandir, withOcr: bool = False):
    """
    convert my content to a document
    """
    doc = Document()
    doc.fromFile(scandir, self.scanned_file.value, local=True, withOcr=withOcr)
    doc.wikiUser = self.wiki_user_select.value
    doc.categories = self.categories.value
    if not withOcr:
        doc.ocrText = self.ocr_text_area.value
    return doc

update()

update the page_link dependend on the page text or selected wiki

Source code in scan/upload.py
196
197
198
199
200
201
202
203
204
205
206
207
208
def update(self):
    """
    update the page_link dependend on the page text or selected wiki
    """
    page_title = self.page_title.value
    wiki_id = self.wiki_user_select.value
    if wiki_id in self.wiki_users:
        wiki_user = self.wiki_users[wiki_id]
        wiki_url = f"{wiki_user.url}{wiki_user.scriptPath}"
        wiki_link = Link.create(f"{wiki_url}/index.php/{page_title}", page_title)
        self.page_link.content = wiki_link
        link_style = self.blue_link if self.uploaded else self.red_link
        self.page_link.style(link_style)

update_pdf_viewer_height(e) async

Update the height of the PDF viewer based on the slider value.

Source code in scan/upload.py
184
185
186
187
188
189
190
191
async def update_pdf_viewer_height(self, e):
    """
    Update the height of the PDF viewer based on the slider value.
    """
    self.rem_value = e.value
    new_height = f"h-[{self.rem_value}rem]"  # Calculate the new height in rem
    self.pdf_viewer.classes = f"w-full {new_height}"  # Update the PDF viewer height
    self.splitter.update()

UploadLogFilter

Bases: Filter

logging filter for the Uploadform

Source code in scan/upload.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
class UploadLogFilter(logging.Filter):
    """
    logging filter for the Uploadform
    """

    def __init__(self, progressbar):
        super(UploadLogFilter, self).__init__()
        self.progressbar = progressbar
        self.reset()

    def reset(self, progress_step: int = 1, per_log: int = 250):
        self.progressbar.reset()
        self.progress_step = progress_step
        self.per_log = per_log
        self.module_counter = Counter()

    def show_stats(self, log_view):
        """ """
        stats = self.module_counter.most_common()  # Get the most common modules
        stats_str = "\n".join([f"{module}: {count} logs" for module, count in stats])
        if log_view:
            log_view.push(stats_str)
        pass

    def filter(self, record):
        self.module_counter[record.module] += 1
        if sum(self.module_counter.values()) % self.per_log == 0:
            self.progressbar.update(
                self.progress_step
            )  # Increment progress bar by stepsize
        msg = str(record.msg).lower()
        # make sure errors are still shown
        if "error" in msg:
            return False
        return True  # Prevent standard logging

show_stats(log_view)

Source code in scan/upload.py
52
53
54
55
56
57
58
def show_stats(self, log_view):
    """ """
    stats = self.module_counter.most_common()  # Get the most common modules
    stats_str = "\n".join([f"{module}: {count} logs" for module, count in stats])
    if log_view:
        log_view.push(stats_str)
    pass

version

Created on 2022-02-16

@author: wf

Version dataclass

Bases: object

Version handling for scan2wiki

Source code in scan/version.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@dataclass
class Version(object):
    """
    Version handling for scan2wiki
    """

    name = "scan2wiki"
    version = scan.__version__
    description = "Scan to Wiki by watching a scan folder"
    date = "2021-12-20"
    updated = "2024-02-22"

    authors = "Wolfgang Fahl"

    doc_url = "https://wiki.bitplan.com/index.php/scan2wiki"
    chat_url = "https://github.com/WolfgangFahl/scan2wiki/discussions"
    cm_url = "https://github.com/WolfgangFahl/scan2wiki"

    license = f"""Copyright 2023 contributors. All rights reserved.

  Licensed under the Apache License 2.0
  http://www.apache.org/licenses/LICENSE-2.0

  Distributed on an "AS IS" basis without warranties
  or conditions of any kind, either express or implied."""

    longDescription = f"""{name} version {version}
{description}

  Created by {authors} on {date} last updated {updated}"""

webcam

Created on 2023-11-16

@author: wf

WebcamForm

allow scanning pictures from a webcam

Source code in scan/webcam.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
class WebcamForm:
    """
    allow scanning pictures from a webcam
    """

    def __init__(self, webserver, default_url: str):
        """
        construct me
        """
        self.webserver = webserver
        self.scandir = webserver.scandir
        self.url = default_url
        self.shot_url = f"{self.url}/shot.jpg"
        self.image_path = None
        self.amazon = Amazon(self.webserver.debug)
        self.product = None
        self.gtin = None
        self.products = Products()  # Initialize the Products instance
        self.products.load_from_json()  # Load existing products
        self.setup_form()
        self.update_product_grid()

    def notify(self, msg):
        ui.notify(msg)
        if self.webserver.log_view:
            self.webserver.log_view.push(msg)

    async def run_scan(self):
        """
        Start the scan process in the background.
        """
        _, scan_coro = self.task_handler.execute_in_background(self.save_webcam_shot)
        self.image_path, msg = await scan_coro()
        self.notify(msg)
        self.update_preview(self.image_path)

    def save_webcam_shot(self) -> str:
        """
        Fetches an image from the webcam URL and saves it with a timestamp in the specified directory.

        Returns:
            str: The file name of the saved webcam image, or an error message if the fetch failed.
        """
        image_file_name = None
        try:
            shot_url = f"{self.url}/shot.jpg"
            response = requests.get(shot_url)
            if response.status_code == 200:
                # Ensure the scandir directory exists
                Path(self.scandir).mkdir(parents=True, exist_ok=True)
                image_data = response.content
                # Get current date and time without timezone information
                timestamp = datetime.now().strftime("%Y-%m-%d_%H%M%S")
                # Define the full path to save the image
                image_file_name = f"webcam_{timestamp}.jpg"
                image_file_path = Path(self.scandir) / image_file_name
                # Write the image data to the file system
                with open(image_file_path, "wb") as image_file:
                    image_file.write(image_data)
                msg = f"Saved webcam image to {image_file_path}"
            else:
                msg = f"Failed to fetch the webcam image. Status code: {response.status_code}"
                image_file_name = ""

        except Exception as ex:
            self.webserver.handle_exception(ex)

        return image_file_name, msg

    def setup_form(self):
        """
        Setup the webcam form
        """
        # Button to refresh or scan the video stream
        self.scan_button = ui.button("Scan", on_click=self.run_scan)
        self.barcode_button = ui.button("Barcode", on_click=self.scan_barcode)
        self.lookup_button = ui.button("Lookup", on_click=self.lookup_gtin)
        self.add_button = ui.button("add", on_click=self.add_product)
        self.webcam_input = ui.input(value=self.url)
        self.image_link = ui.html().style(Link.blue)
        self.gtin_input = ui.input("gtin", value=self.gtin).bind_value(self, "gtin")
        self.barcode_results = ui.html("")
        self.product_grid = ListOfDictsGrid()
        # HTML container for the webcam snap shot
        self.preview = ui.html()

    def update_product_grid(self):
        """
        Update the product grid with the current products.
        """
        lod = self.products.get_aggrid_lod()
        self.product_grid.load_lod(lod)

    async def add_product(self):
        """
        add the given product
        """
        self.products.add_product(self.product)
        self.products.save_to_json()  # Save the updated product list
        self.update_product_grid()  # Update the product grid

    def lookup_gtin(self):
        """
        lookup the  global trade identification number e.g. ean
        """
        if not self.gtin:
            return
        # Perform Amazon lookup for gtin
        amazon_products = self.amazon.lookup_products(self.gtin)
        if amazon_products:
            # Assuming you want to display the first product found for each barcode
            self.product = amazon_products[0]
            self.product.gtin = self.gtin
            product_html = self.product.as_html()
            product_details = product_html
            msg = f"found {self.product.title} for gtin {self.gtin}"
        else:
            msg = f"No matching Amazon product found for gtin {self.gtin}."
            product_details = f"<p>{msg}</p>"

        html_markup = f"<p>Code: {self.gtin}, {product_details}</p>"
        self.notify(msg)
        self.barcode_results.content = html_markup

    async def scan_barcode(self):
        """
        Scan for barcodes in the most recently saved webcam image and look up products on Amazon.
        """
        try:
            if self.image_path:
                barcode_path = f"{self.scandir}/{self.image_path}"
                barcode_list = Barcode.decode(barcode_path)
                if barcode_list and len(barcode_list) >= 1:
                    barcode = barcode_list[0]
                    self.gtin_input.value = barcode.code
                    msg = f"barcode {barcode.code} type {barcode.type} found"
                else:
                    msg = "No barcodes found."
            else:
                msg = "No image to scan for barcodes."
            self.notify(msg)
        except Exception as ex:
            self.webserver.handle_exception(ex)

    def update_preview(self, image_path: str = None):
        """
        Update the preview with the current URL of the webcam.
        """
        if image_path:
            url = f"/files/{image_path}"
            html_markup = f"""<img src="{url}" style="width: 100%; height: auto;" />"""
            self.image_link.content = Link.create(url, image_path)
            self.preview.content = html_markup
        else:
            self.preview.content = "Loading..."

__init__(webserver, default_url)

construct me

Source code in scan/webcam.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def __init__(self, webserver, default_url: str):
    """
    construct me
    """
    self.webserver = webserver
    self.scandir = webserver.scandir
    self.url = default_url
    self.shot_url = f"{self.url}/shot.jpg"
    self.image_path = None
    self.amazon = Amazon(self.webserver.debug)
    self.product = None
    self.gtin = None
    self.products = Products()  # Initialize the Products instance
    self.products.load_from_json()  # Load existing products
    self.setup_form()
    self.update_product_grid()

add_product() async

add the given product

Source code in scan/webcam.py
112
113
114
115
116
117
118
async def add_product(self):
    """
    add the given product
    """
    self.products.add_product(self.product)
    self.products.save_to_json()  # Save the updated product list
    self.update_product_grid()  # Update the product grid

lookup_gtin()

lookup the global trade identification number e.g. ean

Source code in scan/webcam.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def lookup_gtin(self):
    """
    lookup the  global trade identification number e.g. ean
    """
    if not self.gtin:
        return
    # Perform Amazon lookup for gtin
    amazon_products = self.amazon.lookup_products(self.gtin)
    if amazon_products:
        # Assuming you want to display the first product found for each barcode
        self.product = amazon_products[0]
        self.product.gtin = self.gtin
        product_html = self.product.as_html()
        product_details = product_html
        msg = f"found {self.product.title} for gtin {self.gtin}"
    else:
        msg = f"No matching Amazon product found for gtin {self.gtin}."
        product_details = f"<p>{msg}</p>"

    html_markup = f"<p>Code: {self.gtin}, {product_details}</p>"
    self.notify(msg)
    self.barcode_results.content = html_markup

run_scan() async

Start the scan process in the background.

Source code in scan/webcam.py
46
47
48
49
50
51
52
53
async def run_scan(self):
    """
    Start the scan process in the background.
    """
    _, scan_coro = self.task_handler.execute_in_background(self.save_webcam_shot)
    self.image_path, msg = await scan_coro()
    self.notify(msg)
    self.update_preview(self.image_path)

save_webcam_shot()

Fetches an image from the webcam URL and saves it with a timestamp in the specified directory.

Returns:

Name Type Description
str str

The file name of the saved webcam image, or an error message if the fetch failed.

Source code in scan/webcam.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def save_webcam_shot(self) -> str:
    """
    Fetches an image from the webcam URL and saves it with a timestamp in the specified directory.

    Returns:
        str: The file name of the saved webcam image, or an error message if the fetch failed.
    """
    image_file_name = None
    try:
        shot_url = f"{self.url}/shot.jpg"
        response = requests.get(shot_url)
        if response.status_code == 200:
            # Ensure the scandir directory exists
            Path(self.scandir).mkdir(parents=True, exist_ok=True)
            image_data = response.content
            # Get current date and time without timezone information
            timestamp = datetime.now().strftime("%Y-%m-%d_%H%M%S")
            # Define the full path to save the image
            image_file_name = f"webcam_{timestamp}.jpg"
            image_file_path = Path(self.scandir) / image_file_name
            # Write the image data to the file system
            with open(image_file_path, "wb") as image_file:
                image_file.write(image_data)
            msg = f"Saved webcam image to {image_file_path}"
        else:
            msg = f"Failed to fetch the webcam image. Status code: {response.status_code}"
            image_file_name = ""

    except Exception as ex:
        self.webserver.handle_exception(ex)

    return image_file_name, msg

scan_barcode() async

Scan for barcodes in the most recently saved webcam image and look up products on Amazon.

Source code in scan/webcam.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
async def scan_barcode(self):
    """
    Scan for barcodes in the most recently saved webcam image and look up products on Amazon.
    """
    try:
        if self.image_path:
            barcode_path = f"{self.scandir}/{self.image_path}"
            barcode_list = Barcode.decode(barcode_path)
            if barcode_list and len(barcode_list) >= 1:
                barcode = barcode_list[0]
                self.gtin_input.value = barcode.code
                msg = f"barcode {barcode.code} type {barcode.type} found"
            else:
                msg = "No barcodes found."
        else:
            msg = "No image to scan for barcodes."
        self.notify(msg)
    except Exception as ex:
        self.webserver.handle_exception(ex)

setup_form()

Setup the webcam form

Source code in scan/webcam.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def setup_form(self):
    """
    Setup the webcam form
    """
    # Button to refresh or scan the video stream
    self.scan_button = ui.button("Scan", on_click=self.run_scan)
    self.barcode_button = ui.button("Barcode", on_click=self.scan_barcode)
    self.lookup_button = ui.button("Lookup", on_click=self.lookup_gtin)
    self.add_button = ui.button("add", on_click=self.add_product)
    self.webcam_input = ui.input(value=self.url)
    self.image_link = ui.html().style(Link.blue)
    self.gtin_input = ui.input("gtin", value=self.gtin).bind_value(self, "gtin")
    self.barcode_results = ui.html("")
    self.product_grid = ListOfDictsGrid()
    # HTML container for the webcam snap shot
    self.preview = ui.html()

update_preview(image_path=None)

Update the preview with the current URL of the webcam.

Source code in scan/webcam.py
163
164
165
166
167
168
169
170
171
172
173
def update_preview(self, image_path: str = None):
    """
    Update the preview with the current URL of the webcam.
    """
    if image_path:
        url = f"/files/{image_path}"
        html_markup = f"""<img src="{url}" style="width: 100%; height: auto;" />"""
        self.image_link.content = Link.create(url, image_path)
        self.preview.content = html_markup
    else:
        self.preview.content = "Loading..."

update_product_grid()

Update the product grid with the current products.

Source code in scan/webcam.py
105
106
107
108
109
110
def update_product_grid(self):
    """
    Update the product grid with the current products.
    """
    lod = self.products.get_aggrid_lod()
    self.product_grid.load_lod(lod)