Skip to content

play-chess-with-a-webcam API Documentation

args

Created on 2019-12-08

@author: wf

Args

Command Line argument handling

Source code in pcwawc/args.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
class Args:
    """Command Line argument handling"""

    def __init__(self, description):
        self.parser = argparse.ArgumentParser(description=description)
        self.parser.add_argument(
            "--input", default="0", help="Manually set the input device."
        )
        self.parser.add_argument(
            "--autowarp",
            action="store_true",
            help="automatically find and warp chessboard",
        )

        self.parser.add_argument("--black", default=None, help="PGN Black header")

        self.parser.add_argument(
            "--debug", action="store_true", help="show debug output"
        )

        self.parser.add_argument(
            "--detector", default="simple8x8", help="move detector to be used"
        )

        self.parser.add_argument("--event", default=None, help="PGN Event header")

        self.parser.add_argument(
            "--fen", default=None, help="Forsyth–Edwards Notation to start with"
        )

        self.parser.add_argument("--game", default=None, help="game to initialize with")

        self.parser.add_argument(
            "--lichess", action="store_true", help="activate lichess integration"
        )

        self.parser.add_argument(
            "--nomoves", action="store_true", help="do not show each individual move"
        )

        self.parser.add_argument(
            "--nowarp",
            action="store_true",
            help="chessboard vision is already squared e.g. recorded that way",
        )

        self.parser.add_argument("--round", default=None, help="PGN Round header")

        self.parser.add_argument(
            "--rotation",
            type=int,
            default=0,
            help="rotation of chessboard 0,90,180 or 270",
        )

        self.parser.add_argument("--site", default=None, help="PGN Site header")

        self.parser.add_argument(
            "--startframe",
            type=int,
            default=0,
            help="video frame at which to start detection",
        )

        self.parser.add_argument(
            "--speedup",
            type=int,
            default=1,
            help="detection speedup - higher speedup means less precision",
        )

        self.parser.add_argument("--white", default=None, help="PGN White header")

        self.parser.add_argument("--warp", default="[]", help="warp points")

        self.parser.add_argument(
            "--distance",
            type=int,
            default=5,
            help="detection pixel distance - number of pixels analyzed i square of this",
        )

        self.parser.add_argument(
            "--step",
            type=int,
            default=3,
            help="detection pixel steps - distance*step is the grid size being analyzed",
        )

        pass

    def parse(self, argv):
        self.args = self.parser.parse_args(argv)
        self.args.warpPointList = ast.literal_eval(self.args.warp)
        return self.args

board

Board

Bases: object

This class is used to hold the state of a chessboard with pieces positions and the current player's color which player needs to play. It uses the python-chess library by default

Source code in pcwawc/board.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
@implementer(IChessBoard)
class Board(object):
    """This class is used to hold the state of a chessboard with pieces positions and the current player's color which player needs to play. It uses the python-chess library by default"""

    debug = False
    EMPTY_FEN = "8/8/8/8/8/8/8/8 w - -"
    START_FEN = chess.STARTING_BOARD_FEN

    # initialize the board
    def __init__(self, args=None):
        self.chessboard = chess.Board()
        self.fieldsByAn = {}
        self.args = args
        self.debug = self.args is not None and self.args.debug
        self.game = WebCamGame.fromArgs(args)
        self.updateFen()
        self.game.update(self)

        self.fields = [[0 for x in range(Field.rows)] for y in range(Field.cols)]
        for row in range(Field.rows):
            for col in range(Field.cols):
                field = Field(self, row, col)
                self.fieldsByAn[field.an] = field
                self.fields[col][row] = field

    def fieldAt(self, row, col):
        return self.fields[col][row]

    def genSquares(self):
        for field in self.fieldsByAn.values():
            yield field

    def divideInSquares(self, width, height):
        # interpolate the centers of the 8x8 fields from a squared image
        fieldHeight = height / Field.rows
        fieldWidth = width / Field.cols
        for field in self.genSquares():
            field.setRect(width, height, fieldWidth, fieldHeight)

    def fieldStateCounts(self):
        # there are 6 different FieldStats
        counts = [0, 0, 0, 0, 0, 0]
        for field in self.fieldsByAn.values():
            fieldState = field.getFieldState()
            counts[fieldState] = counts[fieldState] + 1
        return counts

    def piecesOfColor(self, color):
        count = 0
        for field in self.fieldsByAn.values():
            piece = field.getPiece()
            if piece is not None and piece.color == color:
                count = count + 1
        return count

    # perform the given move
    def performMove(self, move):
        fromCell, toCell = move
        return self.ucimove(fromCell + toCell)

    def ucimove(self, ucimove):
        move = Move.from_uci(ucimove.lower())
        return self.move(move)

    def move(self, move):
        """perform the given move"""
        try:
            if self.debug:
                print("trying to perform move %s on" % (str(move)))
                print("%s" % (self.unicode()))
            san = self.chessboard.san(move)
        except Exception as e:
            if self.debug:
                print("failed with error: %s" % (str(e)))
            return None

        self.chessboard.push(move)
        self.game.move(self)
        self.updateFen()
        if self.args is not None and not self.args.nomoves:
            print("move %s" % (san))
            print("%s" % (self.unicode()))
        return san

    def takeback(self):
        if self.game.moveIndex > 0:
            self.game.moveIndex = self.game.moveIndex - 1
            self.chessboard.pop()
            self.updateFen()
            return True
        else:
            return False

    def lockGame(self):
        # @TODO implement locking of a saved game to make it immutable
        gameid = self.game.gameid
        self.game.locked = True
        return gameid

    # set my board and game from the given pgn
    def setPgn(self, pgn):
        self.game.pgn = pgn
        pgnIo = io.StringIO(pgn)
        game = chess.pgn.read_game(pgnIo)
        if game is None:
            # TODO log a warning
            return
        self.chessboard = game.board()
        for move in game.mainline_moves():
            self.chessboard.push(move)
        self.updateFen()

    def updatePieces(self, fen):
        self.chessboard = chess.Board(fen)
        self.updateFen()

    # get my fen description
    def updateFen(self):
        self.fen = self.chessboard.board_fen()
        self.game.fen = self.fen
        return self.fen

    # get my unicode representation
    def unicode(self):
        unicode = self.chessboard.unicode()
        return unicode

    def changeToMove(self, change):
        sq1, sq2 = change
        """ convert the given change in the physical board to a move """
        for move in self.chessboard.legal_moves:
            movestr = str(move)
            if sq1 + sq2 == movestr or sq2 + sq1 == movestr:
                return move
        return None

move(move)

perform the given move

Source code in pcwawc/board.py
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def move(self, move):
    """perform the given move"""
    try:
        if self.debug:
            print("trying to perform move %s on" % (str(move)))
            print("%s" % (self.unicode()))
        san = self.chessboard.san(move)
    except Exception as e:
        if self.debug:
            print("failed with error: %s" % (str(e)))
        return None

    self.chessboard.push(move)
    self.game.move(self)
    self.updateFen()
    if self.args is not None and not self.args.nomoves:
        print("move %s" % (san))
        print("%s" % (self.unicode()))
    return san

boarddetector

BoardDetector

Bases: Observable

detect a chess board's state from the given image

Source code in pcwawc/boarddetector.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
@implementer(IMoveDetector)
class BoardDetector(Observable):
    """detect a chess board's state from the given image"""

    frameDebug = False

    def __init__(self):
        """construct me"""
        # make me observable
        super(BoardDetector, self).__init__()
        pass

    def setup(self, name, vision):
        self.name = name
        self.vision = vision
        self.board = vision.board
        self.video = vision.video
        self.hsv = None
        self.debug = False

    def sortByFieldState(self):
        # get a dict of fields sorted by field state
        sortedByFieldState = sorted(
            self.board.fieldsByAn.values(), key=lambda field: field.getFieldState()
        )
        counts = self.board.fieldStateCounts()
        sortedFields = {}
        fromIndex = 0
        for fieldState in FieldState:
            toIndex = fromIndex + counts[fieldState]
            sortedFields[fieldState] = sortedByFieldState[fromIndex:toIndex]
            fromIndex = toIndex
        return sortedFields

    def analyzeFields(self, image, grid, roiLambda):
        for field in self.board.genSquares():
            field.divideInROIs(grid, roiLambda)
            for roi in field.rois:
                roi.analyze(image)

    def analyzeColors(self, image, distance=3, step=1):
        self.hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
        for field in self.board.genSquares():
            field.analyzeColor(image, self.hsv, distance, step)

    def onChessBoardImage(self, imageEvent):
        cbImageSet = imageEvent.cbImageSet
        vision = cbImageSet.vision
        args = vision.args
        distance = args.distance
        step = args.step
        return self.analyze(cbImageSet, distance, step)

    # analyze the given image
    def analyze(self, cbImageSet, distance=3, step=1):
        frameIndex = cbImageSet.frameIndex
        cbWarped = cbImageSet.cbWarped
        image = cbWarped.image
        self.board.divideInSquares(cbWarped.width, cbWarped.height)
        self.analyzeColors(image, distance, step)
        sortedFields = self.sortByFieldState()

        if self.debug:
            overlay = image.copy()

        for fieldState, fields in sortedFields.items():
            for field in fields:
                l = field.luminance
                if BoardDetector.frameDebug:
                    print(
                        "frame %5d: %s luminance: %3.0f ± %3.0f (%d) rgbColorKey: %3.0f colorKey: %.0f"
                        % (
                            frameIndex,
                            field.an,
                            l.mean(),
                            l.standard_deviation(),
                            l.n,
                            field.rgbColorKey,
                            field.colorKey,
                        )
                    )
                if self.debug:
                    field.drawDebug(self.video, overlay, fieldState)
                    alpha = 0.6  # Transparency factor.
                    # Following line overlays transparent rectangle over the image
                    image_new = cv2.addWeighted(overlay, alpha, image, 1 - alpha, 0)
                    image = image_new
                    cbImageSet.cbDebug = ChessBoardImage(image, "debug")
        return image

__init__()

construct me

Source code in pcwawc/boarddetector.py
17
18
19
20
21
def __init__(self):
    """construct me"""
    # make me observable
    super(BoardDetector, self).__init__()
    pass

boardfinder

BoardFinder

Bases: object

find a chess board in the given image

Source code in pcwawc/boardfinder.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
class BoardFinder(object):
    """find a chess board in the given image"""

    debug = False
    black = (0, 0, 0)
    white = (255, 255, 255)
    darkGrey = (256 // 3, 256 // 3, 256 / 3)
    lightGrey = (256 * 2 // 3, 256 * 2 // 3, 256 * 2 // 3)

    def __init__(self, image, video=None):
        """construct me from the given input image"""
        if video is None:
            video = Video()
        self.video = video
        self.image = image
        # guess the topleft color
        self.topleft = chess.WHITE
        self.height, self.width = self.image.shape[:2]

    @staticmethod
    def centerXY(xylist):
        x, y = zip(*xylist)
        l = len(x)
        return sum(x) / l, sum(y) / l

    @staticmethod
    def sortPoints(xylist):
        """sort points clockwise see https://stackoverflow.com/a/59115565/1497139"""
        cx, cy = BoardFinder.centerXY(xylist)
        xy_sorted = sorted(xylist, key=lambda x: math.atan2((x[1] - cy), (x[0] - cx)))
        return xy_sorted

    def findOuterCorners(self, searchWidth=640):
        """find my outer corners as limited by the OpenCV findChessBoard algorithm - to be later expanded"""
        found = self.findCorners(self.image, limit=1, searchWidth=searchWidth)
        # we expected to find a board
        if len(found) != 1:
            raise Exception("no corners found")
        chesspattern = next(iter(found))
        corners = found[chesspattern]
        corners.calcPolygons(0, Corners.safetyMargin)
        corners.calcTrapez()
        return corners

    def preparefindCorners(self, image, searchWidth=640):
        sw = self.width
        sh = self.height
        if sw > searchWidth:
            sw = searchWidth
            sh = self.height * sw // self.width
        searchimage = cv2.resize(self.image, (sw, sh))
        if BoardFinder.debug:
            print(
                "BoardFinder for %dx%d image resized to %dx%d"
                % (self.width, self.height, sw, sh)
            )

        gray = cv2.cvtColor(searchimage, cv2.COLOR_BGR2GRAY)
        fullSizeGray = cv2.cvtColor(self.image, cv2.COLOR_BGR2GRAY)
        return gray, fullSizeGray

    def findCorners(self, image, limit=1, searchWidth=640):
        """start finding the chessboard with the given limit and the given maximum width of the search image"""
        startt = timer()
        gray, fullSizeGray = self.preparefindCorners(image, searchWidth)
        self.found = {}
        for chesspattern in Corners.genChessPatterns():
            corners = Corners(chesspattern, self.video)
            if corners.findPattern(gray) and corners.findPattern(fullSizeGray):
                corners.sort()
                self.found[chesspattern] = corners
            if len(self.found) >= limit:
                break
        endt = timer()
        if BoardFinder.debug:
            print("found %d patterns in %.1f s" % (len(self.found), (endt - startt)))
        return self.found

    def findChessBoard(self, image, title):
        """find a chess board in the given image and return the trapez polygon for it"""
        corners = self.findOuterCorners()
        histograms = self.getHistograms(image, title, corners)
        self.expand(image, title, histograms, corners)
        return corners

    def fieldColor(self, pos):
        """determine the field color at the given position"""
        row, col = pos
        # the color of the topleft field might be different then A8=WHITE when we have a rotated image
        oddeven = 1 if self.topleft == chess.WHITE else 0
        # calculate the chessboard color of the given position based on the topleft color
        color = chess.WHITE if (col + row) % 2 == oddeven else chess.BLACK
        return color

    def maskPolygon(self, image, polygon):
        """mask the given image with the given polygon"""
        mask = self.video.getEmptyImage(image)
        cv2.fillConvexPoly(mask, polygon, BoardFinder.white)
        masked = self.video.maskImage(image, mask)
        return masked

    def maskCornerPolygons(self, image, corners, filterColor):
        """mask the polygons derived from the given corner points"""
        mask = self.video.getEmptyImage(image)
        polygons = corners.polygons[Corners.safetyMargin]
        for pos, polygon in polygons.items():
            posColor = self.fieldColor(pos)
            if not posColor == filterColor:
                self.drawPolygon(
                    mask, pos, polygon, BoardFinder.white, BoardFinder.white
                )
        # if BoardFinder.debug:
        #    cv2.imshow("mask",mask)
        #    cv2.waitKey(1000)
        masked = self.video.maskImage(image, mask)
        return masked

    def getHistograms(self, image, title, corners):
        """get the two histograms for the given corners we don't no what the color of the topleft corner is so we start with a guess"""
        histograms = {}
        for filterColor in (True, False):
            imageCopy = image.copy()
            masked = self.maskCornerPolygons(imageCopy, corners, filterColor)
            if BoardFinder.debug:
                prefix = "masked-O-" if filterColor else "masked-X-"
                corners.writeDebug(masked, title, prefix)
            histograms[filterColor] = Histogram(masked, histRange=(1, 256))

        # do we need to fix our guess?
        # is the mean color of black (being filtered) higher then when white is filtered?
        if histograms[chess.BLACK].color > histograms[chess.WHITE].color:
            self.topleft = chess.BLACK
            # swap entries
            tmp = histograms[chess.BLACK]
            histograms[chess.BLACK] = histograms[chess.WHITE]
            histograms[chess.WHITE] = tmp
        return histograms

    def getColorFiltered(self, image, histograms, title, corners):
        """get color filtered images based on the given histograms"""
        colorFiltered = {}
        colorMask = {}
        for filterColor in (chess.WHITE, chess.BLACK):
            histogram = histograms[filterColor]
            imageCopy = image.copy()
            lowerColor, upperColor = histogram.range(1.0)
            # make sure the colors are numpy arrays
            lowerColor = np.array(lowerColor, dtype=np.uint8)
            upperColor = np.array(upperColor, dtype=np.uint8)
            # if we would know that the empty fields would be the extreme colors we could do the following:
            # if filterColor==chess.WHITE:
            #    upperColor=(255,255,255)
            # else:
            #    lowerColor=(0,0,0)
            # lower,upper=histogram.mincolor, histogram.maxcolor
            colorMask[filterColor] = cv2.inRange(imageCopy, lowerColor, upperColor)
            # colorMask[filterColor]=histogram.colorMask(imageCopy, 1.5)
            colorFiltered[filterColor] = self.video.maskImage(
                imageCopy, colorMask[filterColor]
            )
            if BoardFinder.debug:
                colorName = "white" if filterColor == chess.WHITE else "black"
                bl, gl, rl = lowerColor
                bu, gu, ru = upperColor
                print(
                    "bgr %s: %3d-%3d, %3d-%3d, %3d-%3d"
                    % (colorName, bl, bu, gl, gu, rl, ru)
                )
                prefix = "colorFiltered-%s-" % (colorName)
                corners.writeDebug(colorFiltered[filterColor], title, prefix)
        backGroundFilter = cv2.bitwise_not(
            cv2.bitwise_or(colorMask[chess.WHITE], colorMask[chess.BLACK])
        )
        imageCopy = image.copy()
        colorFiltered["background"] = self.video.maskImage(imageCopy, backGroundFilter)
        if BoardFinder.debug:
            corners.writeDebug(
                colorFiltered["background"], title, "colorFiltered-background-"
            )
        # side effect - add background histogram
        histograms["background"] = Histogram(
            colorFiltered["background"], histRange=(1, 256)
        )
        return colorFiltered

    def expand(self, image, title, histograms, corners):
        """expand the image finding to 8x8 with the given histograms and corners that are e.g. 7x7,7x5,5x5, ..."""
        if BoardFinder.debug:
            corners.showTrapezDebug(image, title, corners)
        # create a mask for the
        masked8x8 = self.maskPolygon(image, corners.trapez8x8)
        if BoardFinder.debug:
            corners.writeDebug(masked8x8, title, "trapez-masked")
        # draw a 10x10 sized white trapez
        white10x10 = self.video.getEmptyImage(image)
        cv2.fillConvexPoly(white10x10, corners.trapez10x10, BoardFinder.white)
        cv2.fillConvexPoly(white10x10, corners.trapez8x8, BoardFinder.black)
        masked10x10 = white10x10 + masked8x8
        if BoardFinder.debug:
            corners.writeDebug(masked10x10, title, "trapez-white")
        # 9x9 test fails due to a few pixels which are in the way
        # commented out to speed up
        # gray8x8,fullSizeGray8x8=self.preparefindCorners(masked10x10)
        # corners8x8=Corners((9,9),self.video)
        # if corners8x8.findPattern(fullSizeGray8x8):
        #    if BoardFinder.debug:
        #        print("Successfully found 8x8 for %s"+title)
        self.colorFiltered = self.getColorFiltered(
            masked8x8, histograms, title, corners
        )

    def drawPolygon(self, image, pos, polygon, whiteColor, blackColor):
        posColor = self.fieldColor(pos)
        color = blackColor if posColor else whiteColor
        cv2.fillConvexPoly(image, polygon, color)

    def showPolygonDebug(self, image, title, corners):
        """draw polygons for debugging on a copy of the given image with the given corners and write result to the debugImagePath with the given title"""
        imagecopy = image.copy()
        polygons = corners.polygons[0]
        for pos, polygon in polygons.items():
            self.drawPolygon(
                imagecopy, pos, polygon, BoardFinder.lightGrey, BoardFinder.darkGrey
            )
            if Corners.debugSorting:
                row, col = pos
                text = "%d,%d" % (row, col)
                x, y = BoardFinder.centerXY(polygon)
                self.video.drawCenteredText(
                    imagecopy, text, int(x), int(y), fontBGRColor=(255, 0, 0)
                )
        corners.writeDebug(imagecopy, title, "polygons")

    def showHistogramDebug(self, histograms, title, corners):
        """'show' the debug information for the given histograms by writing a plotted histogram image to the debugImagePath"""
        Environment.checkDir(Environment.debugImagePath)
        fig, axes = histograms[True].preparePlot(3, 2)
        histograms[True].plotRow(axes[0, 0], axes[0, 1])
        histograms[False].plotRow(axes[1, 0], axes[1, 1])
        histograms["background"].plotRow(axes[2, 0], axes[2, 1])
        prefix = "histogram"
        filepath = Environment.debugImagePath + "%s-%s-%dx%d.jpg" % (
            title,
            prefix,
            corners.rows,
            corners.cols,
        )
        histograms[False].savefig(fig, filepath)

__init__(image, video=None)

construct me from the given input image

Source code in pcwawc/boardfinder.py
213
214
215
216
217
218
219
220
221
def __init__(self, image, video=None):
    """construct me from the given input image"""
    if video is None:
        video = Video()
    self.video = video
    self.image = image
    # guess the topleft color
    self.topleft = chess.WHITE
    self.height, self.width = self.image.shape[:2]

expand(image, title, histograms, corners)

expand the image finding to 8x8 with the given histograms and corners that are e.g. 7x7,7x5,5x5, ...

Source code in pcwawc/boardfinder.py
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
def expand(self, image, title, histograms, corners):
    """expand the image finding to 8x8 with the given histograms and corners that are e.g. 7x7,7x5,5x5, ..."""
    if BoardFinder.debug:
        corners.showTrapezDebug(image, title, corners)
    # create a mask for the
    masked8x8 = self.maskPolygon(image, corners.trapez8x8)
    if BoardFinder.debug:
        corners.writeDebug(masked8x8, title, "trapez-masked")
    # draw a 10x10 sized white trapez
    white10x10 = self.video.getEmptyImage(image)
    cv2.fillConvexPoly(white10x10, corners.trapez10x10, BoardFinder.white)
    cv2.fillConvexPoly(white10x10, corners.trapez8x8, BoardFinder.black)
    masked10x10 = white10x10 + masked8x8
    if BoardFinder.debug:
        corners.writeDebug(masked10x10, title, "trapez-white")
    # 9x9 test fails due to a few pixels which are in the way
    # commented out to speed up
    # gray8x8,fullSizeGray8x8=self.preparefindCorners(masked10x10)
    # corners8x8=Corners((9,9),self.video)
    # if corners8x8.findPattern(fullSizeGray8x8):
    #    if BoardFinder.debug:
    #        print("Successfully found 8x8 for %s"+title)
    self.colorFiltered = self.getColorFiltered(
        masked8x8, histograms, title, corners
    )

fieldColor(pos)

determine the field color at the given position

Source code in pcwawc/boardfinder.py
289
290
291
292
293
294
295
296
def fieldColor(self, pos):
    """determine the field color at the given position"""
    row, col = pos
    # the color of the topleft field might be different then A8=WHITE when we have a rotated image
    oddeven = 1 if self.topleft == chess.WHITE else 0
    # calculate the chessboard color of the given position based on the topleft color
    color = chess.WHITE if (col + row) % 2 == oddeven else chess.BLACK
    return color

findChessBoard(image, title)

find a chess board in the given image and return the trapez polygon for it

Source code in pcwawc/boardfinder.py
282
283
284
285
286
287
def findChessBoard(self, image, title):
    """find a chess board in the given image and return the trapez polygon for it"""
    corners = self.findOuterCorners()
    histograms = self.getHistograms(image, title, corners)
    self.expand(image, title, histograms, corners)
    return corners

findCorners(image, limit=1, searchWidth=640)

start finding the chessboard with the given limit and the given maximum width of the search image

Source code in pcwawc/boardfinder.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
def findCorners(self, image, limit=1, searchWidth=640):
    """start finding the chessboard with the given limit and the given maximum width of the search image"""
    startt = timer()
    gray, fullSizeGray = self.preparefindCorners(image, searchWidth)
    self.found = {}
    for chesspattern in Corners.genChessPatterns():
        corners = Corners(chesspattern, self.video)
        if corners.findPattern(gray) and corners.findPattern(fullSizeGray):
            corners.sort()
            self.found[chesspattern] = corners
        if len(self.found) >= limit:
            break
    endt = timer()
    if BoardFinder.debug:
        print("found %d patterns in %.1f s" % (len(self.found), (endt - startt)))
    return self.found

findOuterCorners(searchWidth=640)

find my outer corners as limited by the OpenCV findChessBoard algorithm - to be later expanded

Source code in pcwawc/boardfinder.py
236
237
238
239
240
241
242
243
244
245
246
def findOuterCorners(self, searchWidth=640):
    """find my outer corners as limited by the OpenCV findChessBoard algorithm - to be later expanded"""
    found = self.findCorners(self.image, limit=1, searchWidth=searchWidth)
    # we expected to find a board
    if len(found) != 1:
        raise Exception("no corners found")
    chesspattern = next(iter(found))
    corners = found[chesspattern]
    corners.calcPolygons(0, Corners.safetyMargin)
    corners.calcTrapez()
    return corners

getColorFiltered(image, histograms, title, corners)

get color filtered images based on the given histograms

Source code in pcwawc/boardfinder.py
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
def getColorFiltered(self, image, histograms, title, corners):
    """get color filtered images based on the given histograms"""
    colorFiltered = {}
    colorMask = {}
    for filterColor in (chess.WHITE, chess.BLACK):
        histogram = histograms[filterColor]
        imageCopy = image.copy()
        lowerColor, upperColor = histogram.range(1.0)
        # make sure the colors are numpy arrays
        lowerColor = np.array(lowerColor, dtype=np.uint8)
        upperColor = np.array(upperColor, dtype=np.uint8)
        # if we would know that the empty fields would be the extreme colors we could do the following:
        # if filterColor==chess.WHITE:
        #    upperColor=(255,255,255)
        # else:
        #    lowerColor=(0,0,0)
        # lower,upper=histogram.mincolor, histogram.maxcolor
        colorMask[filterColor] = cv2.inRange(imageCopy, lowerColor, upperColor)
        # colorMask[filterColor]=histogram.colorMask(imageCopy, 1.5)
        colorFiltered[filterColor] = self.video.maskImage(
            imageCopy, colorMask[filterColor]
        )
        if BoardFinder.debug:
            colorName = "white" if filterColor == chess.WHITE else "black"
            bl, gl, rl = lowerColor
            bu, gu, ru = upperColor
            print(
                "bgr %s: %3d-%3d, %3d-%3d, %3d-%3d"
                % (colorName, bl, bu, gl, gu, rl, ru)
            )
            prefix = "colorFiltered-%s-" % (colorName)
            corners.writeDebug(colorFiltered[filterColor], title, prefix)
    backGroundFilter = cv2.bitwise_not(
        cv2.bitwise_or(colorMask[chess.WHITE], colorMask[chess.BLACK])
    )
    imageCopy = image.copy()
    colorFiltered["background"] = self.video.maskImage(imageCopy, backGroundFilter)
    if BoardFinder.debug:
        corners.writeDebug(
            colorFiltered["background"], title, "colorFiltered-background-"
        )
    # side effect - add background histogram
    histograms["background"] = Histogram(
        colorFiltered["background"], histRange=(1, 256)
    )
    return colorFiltered

getHistograms(image, title, corners)

get the two histograms for the given corners we don't no what the color of the topleft corner is so we start with a guess

Source code in pcwawc/boardfinder.py
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
def getHistograms(self, image, title, corners):
    """get the two histograms for the given corners we don't no what the color of the topleft corner is so we start with a guess"""
    histograms = {}
    for filterColor in (True, False):
        imageCopy = image.copy()
        masked = self.maskCornerPolygons(imageCopy, corners, filterColor)
        if BoardFinder.debug:
            prefix = "masked-O-" if filterColor else "masked-X-"
            corners.writeDebug(masked, title, prefix)
        histograms[filterColor] = Histogram(masked, histRange=(1, 256))

    # do we need to fix our guess?
    # is the mean color of black (being filtered) higher then when white is filtered?
    if histograms[chess.BLACK].color > histograms[chess.WHITE].color:
        self.topleft = chess.BLACK
        # swap entries
        tmp = histograms[chess.BLACK]
        histograms[chess.BLACK] = histograms[chess.WHITE]
        histograms[chess.WHITE] = tmp
    return histograms

maskCornerPolygons(image, corners, filterColor)

mask the polygons derived from the given corner points

Source code in pcwawc/boardfinder.py
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
def maskCornerPolygons(self, image, corners, filterColor):
    """mask the polygons derived from the given corner points"""
    mask = self.video.getEmptyImage(image)
    polygons = corners.polygons[Corners.safetyMargin]
    for pos, polygon in polygons.items():
        posColor = self.fieldColor(pos)
        if not posColor == filterColor:
            self.drawPolygon(
                mask, pos, polygon, BoardFinder.white, BoardFinder.white
            )
    # if BoardFinder.debug:
    #    cv2.imshow("mask",mask)
    #    cv2.waitKey(1000)
    masked = self.video.maskImage(image, mask)
    return masked

maskPolygon(image, polygon)

mask the given image with the given polygon

Source code in pcwawc/boardfinder.py
298
299
300
301
302
303
def maskPolygon(self, image, polygon):
    """mask the given image with the given polygon"""
    mask = self.video.getEmptyImage(image)
    cv2.fillConvexPoly(mask, polygon, BoardFinder.white)
    masked = self.video.maskImage(image, mask)
    return masked

showHistogramDebug(histograms, title, corners)

'show' the debug information for the given histograms by writing a plotted histogram image to the debugImagePath

Source code in pcwawc/boardfinder.py
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
def showHistogramDebug(self, histograms, title, corners):
    """'show' the debug information for the given histograms by writing a plotted histogram image to the debugImagePath"""
    Environment.checkDir(Environment.debugImagePath)
    fig, axes = histograms[True].preparePlot(3, 2)
    histograms[True].plotRow(axes[0, 0], axes[0, 1])
    histograms[False].plotRow(axes[1, 0], axes[1, 1])
    histograms["background"].plotRow(axes[2, 0], axes[2, 1])
    prefix = "histogram"
    filepath = Environment.debugImagePath + "%s-%s-%dx%d.jpg" % (
        title,
        prefix,
        corners.rows,
        corners.cols,
    )
    histograms[False].savefig(fig, filepath)

showPolygonDebug(image, title, corners)

draw polygons for debugging on a copy of the given image with the given corners and write result to the debugImagePath with the given title

Source code in pcwawc/boardfinder.py
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
def showPolygonDebug(self, image, title, corners):
    """draw polygons for debugging on a copy of the given image with the given corners and write result to the debugImagePath with the given title"""
    imagecopy = image.copy()
    polygons = corners.polygons[0]
    for pos, polygon in polygons.items():
        self.drawPolygon(
            imagecopy, pos, polygon, BoardFinder.lightGrey, BoardFinder.darkGrey
        )
        if Corners.debugSorting:
            row, col = pos
            text = "%d,%d" % (row, col)
            x, y = BoardFinder.centerXY(polygon)
            self.video.drawCenteredText(
                imagecopy, text, int(x), int(y), fontBGRColor=(255, 0, 0)
            )
    corners.writeDebug(imagecopy, title, "polygons")

sortPoints(xylist) staticmethod

sort points clockwise see https://stackoverflow.com/a/59115565/1497139

Source code in pcwawc/boardfinder.py
229
230
231
232
233
234
@staticmethod
def sortPoints(xylist):
    """sort points clockwise see https://stackoverflow.com/a/59115565/1497139"""
    cx, cy = BoardFinder.centerXY(xylist)
    xy_sorted = sorted(xylist, key=lambda x: math.atan2((x[1] - cy), (x[0] - cx)))
    return xy_sorted

Corners

Bases: object

Chess board corners

Source code in pcwawc/boardfinder.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
class Corners(object):
    """Chess board corners"""

    debug = False
    debugSorting = False
    """ pixel margin for masked polygons"""
    safetyMargin = 5

    def __init__(self, pattern, video):
        """initialize me with the given rows and columns"""
        self.pattern = pattern
        self.rows, self.cols = pattern
        self.video = video
        # prepare the dict for my polygons
        self.polygons = {}

    @staticmethod
    def genChessPatterns():
        """generate the patterns 7x7, 5x7, 3x7, 5x5, 5x3, 3x3"""
        for rows in range(7, 2, -2):
            for cols in range(7, rows - 2, -2):
                yield (rows, cols)

    @staticmethod
    def sortXY(xy):
        x, y = xy[0]
        return x, y

    def sort(self):
        """sort the corners - this step is currently not implemented so the corners stay rotated"""
        if Corners.debugSorting:
            print("trying to sort %d points" % (len(self.corners)))
        cornerssorted = sorted(self.corners, key=Corners.sortXY)
        if Corners.debugSorting:
            print(cornerssorted)
        # self.corners = np.empty(shape=(len(self.corners),2),dtype=np.float32)
        # for i,val in enumerate(cornerssorted):
        #    self.corners[i]=val
        pass

    def findPattern(self, image):
        """try finding the chess board corners in the given image with the given pattern"""
        self.h, self.w = image.shape[:2]

        start = timer()
        ret, self.corners = cv2.findChessboardCorners(image, self.pattern, None)
        end = timer()
        if Corners.debug:
            print(
                "%dx%d in %dx%d after %.3f s: %s"
                % (
                    self.rows,
                    self.cols,
                    self.w,
                    self.h,
                    (end - start),
                    "✔" if ret else "❌",
                )
            )
        return ret

    def safeXY(self, x, y, dx, dy):
        """return the given x,y tuple shifted by dx,dy making sure the result is not out of my width and height bounds"""
        x = x + dx
        y = y + dy
        if y >= self.h:
            y = self.h - 1
        if y < 0:
            y = 0
        if x >= self.w:
            x = self.w - 1
        if x < 0:
            x = 0
        return (x, y)

    def asPolygons(self, safetyMargin):
        """get the polygons for my corner points"""
        # reshape the array
        cps = np.reshape(self.corners, (self.cols, self.rows, 2))
        polygons = {}
        m = safetyMargin
        for col in range(self.cols - 1):
            for row in range(self.rows - 1):
                x1, y1 = cps[col, row]  # top left
                x2, y2 = cps[col + 1, row]  # left bottom
                x3, y3 = cps[col + 1, row + 1]  # right bottom
                x4, y4 = cps[col, row + 1]  # top right
                clockwise = BoardFinder.sortPoints(
                    [(x1, y1), (x2, y2), (x3, y3), (x4, y4)]
                )
                (x1, y1), (x2, y2), (x3, y3), (x4, y4) = clockwise
                # https://stackoverflow.com/questions/19190484/what-is-the-opencv-findchessboardcorners-convention
                polygon = np.array(
                    [
                        self.safeXY(x1, y1, +m, +m),
                        self.safeXY(x2, y2, -m, +m),
                        self.safeXY(x3, y3, -m, -m),
                        self.safeXY(x4, y4, +m, -m),
                    ],
                    dtype=np.int32,
                )
                polygons[(col, self.rows - 2 - row)] = polygon
        return polygons

    def calcPolygons(self, *safetyMargins):
        """calculate polygons for the given safety margins"""
        for safetyMargin in safetyMargins:
            self.polygons[safetyMargin] = self.asPolygons(safetyMargin)

    def calcTrapez(self):
        """calculate the relevant quadrilaterals"""
        corners = self.corners
        l = len(self.corners)
        rowend = self.rows - 1
        self.topLeft = corners[0]
        self.topRight = corners[rowend]
        self.bottomRight = corners[l - 1]
        self.bottomLeft = corners[l - 1 - rowend]
        self.trapez2Square = Trapez2Square(
            self.topLeft, self.topRight, self.bottomRight, self.bottomLeft
        )
        self.trapez8x8 = self.trapezColRows(8, 8)
        self.trapez10x10 = self.trapezColRows(10, 10)
        self.trapez = self.trapez2Square.relativeTrapezToTrapezXY(0, 0, 1, 1)
        pass

    def trapezColRows(self, cols, rows):
        """return an expanded trapez with the given number of columns and rows"""
        relDeltaRows = (rows / (self.rows - 1) - 1) / 2
        relDeltaCols = (cols / (self.cols - 1) - 1) / 2
        trapez = self.trapez2Square.relativeTrapezToTrapezXY(
            -relDeltaRows, -relDeltaCols, 1 + relDeltaRows, 1 + relDeltaCols
        )
        return trapez

    def showTrapezDebug(self, image, title, corners):
        """'show' a debug picture with the extrapolated quadrilaterals by writing an image to the debugImagePath"""
        overlay = image.copy()
        # draw polytons from outer to inner
        cv2.fillConvexPoly(overlay, self.trapez10x10, (0, 165, 255))  # orange
        cv2.fillConvexPoly(overlay, self.trapez8x8, (128, 128, 128))  # grey
        cv2.fillConvexPoly(overlay, self.trapez, (225, 105, 65))  # royal blue
        alpha = 0.8  # Transparency factor.
        # overlay the
        imageAlpha = cv2.addWeighted(overlay, alpha, image, 1 - alpha, 0)
        corners.writeDebug(imageAlpha, title, "trapez")

    def showDebug(self, image, title):
        """'show' the debug picture of the chessboard corners by drawing the corners and writing the result to the given testImagePath"""
        imageCopy = image.copy()
        cv2.drawChessboardCorners(
            imageCopy, self.pattern, self.corners, patternWasFound=True
        )
        if Corners.debugSorting:
            index = 0
            for point in self.topLeft, self.topRight, self.bottomRight, self.bottomLeft:
                text = "%d" % (index)
                x, y = point[0]
                self.video.drawCenteredText(
                    imageCopy, text, int(x), int(y), fontScale=1, fontBGRColor=(0, 0, 0)
                )
                index += 1
        # if Corners.debugSorting:
        #    for index,corner in enumerate(self.corners):
        #        x,y=corner[0]
        #        text="%d" % (index)
        #        self.video.drawCenteredText(imageCopy, text, int(x), int(y), fontScale=0.5,fontBGRColor=(128,128,128))
        # cv2.imshow('corners', self.image)
        # cv2.waitKey(50)
        self.writeDebug(imageCopy, title, "corners")

    def writeDebug(self, image, title, prefix):
        Environment.checkDir(Environment.debugImagePath)
        cv2.imwrite(
            Environment.debugImagePath
            + "%s-%s-%dx%d.jpg" % (title, prefix, self.rows, self.cols),
            image,
        )

debugSorting = False class-attribute instance-attribute

pixel margin for masked polygons

__init__(pattern, video)

initialize me with the given rows and columns

Source code in pcwawc/boardfinder.py
31
32
33
34
35
36
37
def __init__(self, pattern, video):
    """initialize me with the given rows and columns"""
    self.pattern = pattern
    self.rows, self.cols = pattern
    self.video = video
    # prepare the dict for my polygons
    self.polygons = {}

asPolygons(safetyMargin)

get the polygons for my corner points

Source code in pcwawc/boardfinder.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def asPolygons(self, safetyMargin):
    """get the polygons for my corner points"""
    # reshape the array
    cps = np.reshape(self.corners, (self.cols, self.rows, 2))
    polygons = {}
    m = safetyMargin
    for col in range(self.cols - 1):
        for row in range(self.rows - 1):
            x1, y1 = cps[col, row]  # top left
            x2, y2 = cps[col + 1, row]  # left bottom
            x3, y3 = cps[col + 1, row + 1]  # right bottom
            x4, y4 = cps[col, row + 1]  # top right
            clockwise = BoardFinder.sortPoints(
                [(x1, y1), (x2, y2), (x3, y3), (x4, y4)]
            )
            (x1, y1), (x2, y2), (x3, y3), (x4, y4) = clockwise
            # https://stackoverflow.com/questions/19190484/what-is-the-opencv-findchessboardcorners-convention
            polygon = np.array(
                [
                    self.safeXY(x1, y1, +m, +m),
                    self.safeXY(x2, y2, -m, +m),
                    self.safeXY(x3, y3, -m, -m),
                    self.safeXY(x4, y4, +m, -m),
                ],
                dtype=np.int32,
            )
            polygons[(col, self.rows - 2 - row)] = polygon
    return polygons

calcPolygons(*safetyMargins)

calculate polygons for the given safety margins

Source code in pcwawc/boardfinder.py
127
128
129
130
def calcPolygons(self, *safetyMargins):
    """calculate polygons for the given safety margins"""
    for safetyMargin in safetyMargins:
        self.polygons[safetyMargin] = self.asPolygons(safetyMargin)

calcTrapez()

calculate the relevant quadrilaterals

Source code in pcwawc/boardfinder.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def calcTrapez(self):
    """calculate the relevant quadrilaterals"""
    corners = self.corners
    l = len(self.corners)
    rowend = self.rows - 1
    self.topLeft = corners[0]
    self.topRight = corners[rowend]
    self.bottomRight = corners[l - 1]
    self.bottomLeft = corners[l - 1 - rowend]
    self.trapez2Square = Trapez2Square(
        self.topLeft, self.topRight, self.bottomRight, self.bottomLeft
    )
    self.trapez8x8 = self.trapezColRows(8, 8)
    self.trapez10x10 = self.trapezColRows(10, 10)
    self.trapez = self.trapez2Square.relativeTrapezToTrapezXY(0, 0, 1, 1)
    pass

findPattern(image)

try finding the chess board corners in the given image with the given pattern

Source code in pcwawc/boardfinder.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def findPattern(self, image):
    """try finding the chess board corners in the given image with the given pattern"""
    self.h, self.w = image.shape[:2]

    start = timer()
    ret, self.corners = cv2.findChessboardCorners(image, self.pattern, None)
    end = timer()
    if Corners.debug:
        print(
            "%dx%d in %dx%d after %.3f s: %s"
            % (
                self.rows,
                self.cols,
                self.w,
                self.h,
                (end - start),
                "✔" if ret else "❌",
            )
        )
    return ret

genChessPatterns() staticmethod

generate the patterns 7x7, 5x7, 3x7, 5x5, 5x3, 3x3

Source code in pcwawc/boardfinder.py
39
40
41
42
43
44
@staticmethod
def genChessPatterns():
    """generate the patterns 7x7, 5x7, 3x7, 5x5, 5x3, 3x3"""
    for rows in range(7, 2, -2):
        for cols in range(7, rows - 2, -2):
            yield (rows, cols)

safeXY(x, y, dx, dy)

return the given x,y tuple shifted by dx,dy making sure the result is not out of my width and height bounds

Source code in pcwawc/boardfinder.py
84
85
86
87
88
89
90
91
92
93
94
95
96
def safeXY(self, x, y, dx, dy):
    """return the given x,y tuple shifted by dx,dy making sure the result is not out of my width and height bounds"""
    x = x + dx
    y = y + dy
    if y >= self.h:
        y = self.h - 1
    if y < 0:
        y = 0
    if x >= self.w:
        x = self.w - 1
    if x < 0:
        x = 0
    return (x, y)

showDebug(image, title)

'show' the debug picture of the chessboard corners by drawing the corners and writing the result to the given testImagePath

Source code in pcwawc/boardfinder.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
def showDebug(self, image, title):
    """'show' the debug picture of the chessboard corners by drawing the corners and writing the result to the given testImagePath"""
    imageCopy = image.copy()
    cv2.drawChessboardCorners(
        imageCopy, self.pattern, self.corners, patternWasFound=True
    )
    if Corners.debugSorting:
        index = 0
        for point in self.topLeft, self.topRight, self.bottomRight, self.bottomLeft:
            text = "%d" % (index)
            x, y = point[0]
            self.video.drawCenteredText(
                imageCopy, text, int(x), int(y), fontScale=1, fontBGRColor=(0, 0, 0)
            )
            index += 1
    # if Corners.debugSorting:
    #    for index,corner in enumerate(self.corners):
    #        x,y=corner[0]
    #        text="%d" % (index)
    #        self.video.drawCenteredText(imageCopy, text, int(x), int(y), fontScale=0.5,fontBGRColor=(128,128,128))
    # cv2.imshow('corners', self.image)
    # cv2.waitKey(50)
    self.writeDebug(imageCopy, title, "corners")

showTrapezDebug(image, title, corners)

'show' a debug picture with the extrapolated quadrilaterals by writing an image to the debugImagePath

Source code in pcwawc/boardfinder.py
158
159
160
161
162
163
164
165
166
167
168
def showTrapezDebug(self, image, title, corners):
    """'show' a debug picture with the extrapolated quadrilaterals by writing an image to the debugImagePath"""
    overlay = image.copy()
    # draw polytons from outer to inner
    cv2.fillConvexPoly(overlay, self.trapez10x10, (0, 165, 255))  # orange
    cv2.fillConvexPoly(overlay, self.trapez8x8, (128, 128, 128))  # grey
    cv2.fillConvexPoly(overlay, self.trapez, (225, 105, 65))  # royal blue
    alpha = 0.8  # Transparency factor.
    # overlay the
    imageAlpha = cv2.addWeighted(overlay, alpha, image, 1 - alpha, 0)
    corners.writeDebug(imageAlpha, title, "trapez")

sort()

sort the corners - this step is currently not implemented so the corners stay rotated

Source code in pcwawc/boardfinder.py
51
52
53
54
55
56
57
58
59
60
61
def sort(self):
    """sort the corners - this step is currently not implemented so the corners stay rotated"""
    if Corners.debugSorting:
        print("trying to sort %d points" % (len(self.corners)))
    cornerssorted = sorted(self.corners, key=Corners.sortXY)
    if Corners.debugSorting:
        print(cornerssorted)
    # self.corners = np.empty(shape=(len(self.corners),2),dtype=np.float32)
    # for i,val in enumerate(cornerssorted):
    #    self.corners[i]=val
    pass

trapezColRows(cols, rows)

return an expanded trapez with the given number of columns and rows

Source code in pcwawc/boardfinder.py
149
150
151
152
153
154
155
156
def trapezColRows(self, cols, rows):
    """return an expanded trapez with the given number of columns and rows"""
    relDeltaRows = (rows / (self.rows - 1) - 1) / 2
    relDeltaCols = (cols / (self.cols - 1) - 1) / 2
    trapez = self.trapez2Square.relativeTrapezToTrapezXY(
        -relDeltaRows, -relDeltaCols, 1 + relDeltaRows, 1 + relDeltaCols
    )
    return trapez

chessengine

Created on 2019-12-16

@author: wf

Engine

Chess Engines support e.g. Universal Chess Interface see https://chessprogramming.wikispaces.com/UCI

Source code in pcwawc/chessengine.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
class Engine:
    """Chess Engines support e.g. Universal Chess Interface see https://chessprogramming.wikispaces.com/UCI"""

    # shall we output debug information?
    debug = False

    # list of engines to search for
    engineConfigs = [
        {
            "name": "Crafty Computer Chess",
            "command": "crafty",
            "url": "http://www.craftychess.com/",
            "protocol": "?",
        },
        {
            "name": "GNU Chess",
            "command": "gnuchess",
            "url": "https://www.gnu.org/software/chess/",
            "options": "--uci",
            "protocol": "uci",
        },
        {
            "name": "Stockfish Chess",
            "command": "stockfish",
            "url": "https://stockfishchess.org/",
            "protocol": "uci",
        },
        {
            "name": "XBoard",
            "command": "xboard",
            "url": "https://www.gnu.org/software/xboard/manual/xboard.html",
            "protocol": "xboard",
        },
    ]

    def __init__(self, engineConfig, timeout=5):
        """
        construct me with the given engineConfiguration and timeout

        Args:
            engineConfig(dict): the engine configuration to use
            timeout(float): the number of seconds to wait of the given engine
        """
        self.engineCmd = engineConfig["command"]
        self.enginePath = engineConfig["path"]
        self.name = engineConfig["name"]
        self.url = engineConfig["url"]
        self.protocolName = engineConfig["protocol"]
        self.options = engineConfig["options"] if "options" in engineConfig else None
        if self.protocolName == "uci":
            self.protocol = chess.engine.UciProtocol
        elif self.protocolName == "xboard":
            self.protocol = chess.engine.XBoardProtocol
        else:
            self.protocol = None
        self.timeout = timeout
        # asyncio.set_event_loop_policy(chess.engine.EventLoopPolicy())
        # https://python-chess.readthedocs.io/en/latest/engine.html
        self.error = None

    def open(self):
        """
        open this chess engine
        """
        self.engine = None
        self.error = None
        if self.protocol is None:
            self.error = Exception("unknown protocol for %s" % self.name)
        else:
            try:
                cmd = [self.engineCmd]
                if self.options:
                    cmd.append(self.options)
                self.engine = chess.engine.SimpleEngine.popen(
                    self.protocol, cmd, timeout=self.timeout, debug=Engine.debug
                )
            except Exception as te:
                self.error = te
                pass
        return self.engine

    def close(self):
        """
        close the given engine
        """
        if self.engine is not None:
            try:
                self.engine.quit()
            except concurrent.futures._base.TimeoutError:
                # so what?
                if self.debug:
                    print(
                        "timeout after %1.f secs for %s on close forcing close now ..."
                        % (self.timeout, self.name)
                    )
                self.engine.close()
                pass

    def __str__(self):
        text = "chess engine %s called via %s at %s" % (
            self.name,
            self.engineCmd,
            self.enginePath,
        )
        return text

    @staticmethod
    def findEngines():
        """
        check which engines from the Engine configurations are installed
        on this computer and return a dict of these

        Returns:
            dict: map of Engines by Command e.g. "gnuchess" or "stockfish"
        """
        engineDict = {}
        for engineConfig in Engine.engineConfigs:
            engineCmd = engineConfig["command"]
            enginePath = shutil.which(engineCmd)
            if enginePath is not None:
                if Engine.debug:
                    print("found %s engine at %s" % (engineCmd, enginePath))
                engineConfig["path"] = enginePath
                engineDict[engineCmd] = engineConfig
        return engineDict

__init__(engineConfig, timeout=5)

construct me with the given engineConfiguration and timeout

Parameters:

Name Type Description Default
engineConfig(dict)

the engine configuration to use

required
timeout(float)

the number of seconds to wait of the given engine

required
Source code in pcwawc/chessengine.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def __init__(self, engineConfig, timeout=5):
    """
    construct me with the given engineConfiguration and timeout

    Args:
        engineConfig(dict): the engine configuration to use
        timeout(float): the number of seconds to wait of the given engine
    """
    self.engineCmd = engineConfig["command"]
    self.enginePath = engineConfig["path"]
    self.name = engineConfig["name"]
    self.url = engineConfig["url"]
    self.protocolName = engineConfig["protocol"]
    self.options = engineConfig["options"] if "options" in engineConfig else None
    if self.protocolName == "uci":
        self.protocol = chess.engine.UciProtocol
    elif self.protocolName == "xboard":
        self.protocol = chess.engine.XBoardProtocol
    else:
        self.protocol = None
    self.timeout = timeout
    # asyncio.set_event_loop_policy(chess.engine.EventLoopPolicy())
    # https://python-chess.readthedocs.io/en/latest/engine.html
    self.error = None

close()

close the given engine

Source code in pcwawc/chessengine.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def close(self):
    """
    close the given engine
    """
    if self.engine is not None:
        try:
            self.engine.quit()
        except concurrent.futures._base.TimeoutError:
            # so what?
            if self.debug:
                print(
                    "timeout after %1.f secs for %s on close forcing close now ..."
                    % (self.timeout, self.name)
                )
            self.engine.close()
            pass

findEngines() staticmethod

check which engines from the Engine configurations are installed on this computer and return a dict of these

Returns:

Name Type Description
dict

map of Engines by Command e.g. "gnuchess" or "stockfish"

Source code in pcwawc/chessengine.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
@staticmethod
def findEngines():
    """
    check which engines from the Engine configurations are installed
    on this computer and return a dict of these

    Returns:
        dict: map of Engines by Command e.g. "gnuchess" or "stockfish"
    """
    engineDict = {}
    for engineConfig in Engine.engineConfigs:
        engineCmd = engineConfig["command"]
        enginePath = shutil.which(engineCmd)
        if enginePath is not None:
            if Engine.debug:
                print("found %s engine at %s" % (engineCmd, enginePath))
            engineConfig["path"] = enginePath
            engineDict[engineCmd] = engineConfig
    return engineDict

open()

open this chess engine

Source code in pcwawc/chessengine.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def open(self):
    """
    open this chess engine
    """
    self.engine = None
    self.error = None
    if self.protocol is None:
        self.error = Exception("unknown protocol for %s" % self.name)
    else:
        try:
            cmd = [self.engineCmd]
            if self.options:
                cmd.append(self.options)
            self.engine = chess.engine.SimpleEngine.popen(
                self.protocol, cmd, timeout=self.timeout, debug=Engine.debug
            )
        except Exception as te:
            self.error = te
            pass
    return self.engine

chessimage

Created on 2019-12-10

@author: wf

ChessBoardImage

a chessboard image and it's transformations

Source code in pcwawc/chessimage.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
@implementer(IChessBoardImage)
class ChessBoardImage:
    """a chessboard image and it's transformations"""

    def __init__(self, image, title):
        self.image = image
        self.title = title
        self.height, self.width = image.shape[:2]
        self.pixels = self.height * self.width

    def diffBoardImage(self, cbOther):
        if cbOther is None:
            raise Exception("other is None for diff")
        h, w = self.height, self.width
        ho, wo = cbOther.height, cbOther.width
        if not h == ho or not w == wo:
            raise Exception(
                "image %d x %d has to have same size as other %d x %d for diff"
                % (w, h, wo, ho)
            )
        # return np.subtract(self.image,other)
        diff = cv2.absdiff(self.image, cbOther.image)
        return ChessBoardImage(diff, "diff")

    def showDebug(self, video=None, keyWait=5):
        if video is None:
            video = Video()
        video.showImage(self.image, self.title, keyWait=keyWait)

ChessBoardImageSet

a set of images of the current chess board

Source code in pcwawc/chessimage.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
@implementer(IChessBoardImageSet)
class ChessBoardImageSet:
    """a set of images of the current chess board"""

    def __init__(self, vision, image, frameIndex, timeStamp):
        self.vision = vision
        self.frameIndex = frameIndex
        # see https://stackoverflow.com/questions/47743246/getting-timestamp-of-each-frame-in-a-video
        self.timeStamp = timeStamp
        self.cbImage = ChessBoardImage(image, "chessboard")
        self.cbGUI = self.cbImage
        self.cbWarped = None
        self.cbIdeal = None
        self.cbPreMove = None
        self.cbDiff = None
        self.cbDebug = None

    def placeHolder(self, cbImage):
        """return an empty image if the image is not available"""
        if cbImage is None:
            return self.vision.video.createBlank(
                self.cbWarped.width, self.cbWarped.height, (128, 128, 128)
            )
        else:
            return cbImage.image

    def debugImage(self):
        if self.cbDebug is None:
            self.cbDebug = self.debugImage2x2(
                self.cbWarped, self.cbIdeal, self.cbDiff, self.cbPreMove
            )
        return self.cbDebug

    def debugImage2x2(self, image1, image2, image3, image4):
        image = self.vision.video.as2x2(
            self.placeHolder(image1),
            self.placeHolder(image2),
            self.placeHolder(image3),
            self.placeHolder(image4),
        )
        return ChessBoardImage(image, "debug")

    def showDebug(self, video=None):
        video.showImage(self.debugImage().image, "debug")

    def warpAndRotate(self, nowarp=False):
        """warp and rotate the image as necessary - add timestamp if in debug mode"""
        video = self.vision.video
        warp = self.vision.warp
        if warp.warping or nowarp:
            if nowarp:
                warped = self.cbImage.image.copy()
            else:
                warped = video.warp(self.cbImage.image, warp.points)
            if warp.rotation > 0:
                warped = video.rotate(warped, warp.rotation)
        else:
            warped = self.cbImage.image.copy()
        self.cbWarped = ChessBoardImage(warped, "warped")

    def prepareGUI(self):
        video = self.vision.video
        warp = self.vision.warp
        if self.vision.debug:
            self.cbGUI = self.debugImage()
            video.addTimeStamp(self.cbGUI.image)
        else:
            self.cbGUI = ChessBoardImage(self.cbWarped.image.copy(), "gui")
            if not warp.warping:
                video.drawTrapezoid(self.cbGUI.image, warp.points, warp.bgrColor)

placeHolder(cbImage)

return an empty image if the image is not available

Source code in pcwawc/chessimage.py
123
124
125
126
127
128
129
130
def placeHolder(self, cbImage):
    """return an empty image if the image is not available"""
    if cbImage is None:
        return self.vision.video.createBlank(
            self.cbWarped.width, self.cbWarped.height, (128, 128, 128)
        )
    else:
        return cbImage.image

warpAndRotate(nowarp=False)

warp and rotate the image as necessary - add timestamp if in debug mode

Source code in pcwawc/chessimage.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def warpAndRotate(self, nowarp=False):
    """warp and rotate the image as necessary - add timestamp if in debug mode"""
    video = self.vision.video
    warp = self.vision.warp
    if warp.warping or nowarp:
        if nowarp:
            warped = self.cbImage.image.copy()
        else:
            warped = video.warp(self.cbImage.image, warp.points)
        if warp.rotation > 0:
            warped = video.rotate(warped, warp.rotation)
    else:
        warped = self.cbImage.image.copy()
    self.cbWarped = ChessBoardImage(warped, "warped")

ChessBoardVision

Bases: JsonAbleMixin

implements access to chessboard images

Source code in pcwawc/chessimage.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
@implementer(IChessBoardVision)
class ChessBoardVision(JsonAbleMixin):
    """implements access to chessboard images"""

    def __init__(self, args, board=None):
        self.device = args.input
        self.title = Video.title(self.device)
        self.video = Video(self.title)
        self.video.headless = Environment.inContinuousIntegration()
        self.args = args
        self.showDebug = args.debug
        self.start = None
        self.quitWanted = False
        self.hasImage = False
        self.timestamps = []
        self.debug = args.debug
        if board is None:
            board = Board(args=args)
        self.board = board
        if self.args.fen is not None:
            self.board.updatePieces(self.args.fen)
        self.warp = Warp(args.warpPointList)
        self.warp.rotation = args.rotation
        if self.args.nowarp:
            self.warp.warping = True
        self.firstFrame = True
        self.speedup = args.speedup
        pass

    def open(self, device):
        self.video.capture(device)
        self.device = device
        self.firstFrame = True

    def readChessBoardImage(self):
        for i in range(self.speedup):
            self.hasImage, image, self.quitWanted = self.video.readFrame(self.showDebug)
            if self.quitWanted:
                return self.previous
        frames = self.video.frames
        if self.firstFrame:
            self.start = timer()
        timestamp = timer() - self.start
        self.chessBoardImageSet = ChessBoardImageSet(
            self, image, frames // self.speedup, timestamp
        )
        self.firstFrame = False
        self.timestamps.append(timestamp)
        return self.chessBoardImageSet

    def close(self):
        self.video.close()

    def __getstate__(self):
        state = {}
        state["title"] = self.title
        device = self.device
        if not Video.is_int(device):
            cwd = os.getcwd()
            devicepath = os.path.dirname(device)
            root = os.path.commonpath([cwd, devicepath])
            device = os.path.relpath(devicepath, root) + "/" + os.path.basename(device)
        state["device"] = device
        state["timestamps"] = self.timestamps
        return state

    def __setstate__(self, state):
        self.title = state["title"]
        self.device = state["device"]
        self.timestamps = state["timestamps"]

    def save(self, path="games/videos"):
        env = Environment()
        savepath = str(env.projectPath) + "/" + path
        Environment.checkDir(savepath)
        jsonFile = savepath + "/" + self.title
        self.writeJson(jsonFile)

Warp

Bases: YamlAbleMixin, JsonAbleMixin

holds the trapezoid points to be use for warping an image take from a peculiar angle

Source code in pcwawc/chessimage.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
@implementer(IWarp)
class Warp(YamlAbleMixin, JsonAbleMixin):
    """holds the trapezoid points to be use for warping an image take from a peculiar angle"""

    # construct me from the given setting
    def __init__(self, pointList=[], rotation=0, bgrColor=(0, 255, 0)):
        self.rotation = rotation
        self.bgrColor = bgrColor
        self.pointList = pointList
        self.updatePoints()

    def rotate(self, angle):
        """rotate me by the given angle"""
        self.rotation = self.rotation + angle
        if self.rotation >= 360:
            self.rotation = self.rotation % 360

    def updatePoints(self):
        """update my points"""
        pointLen = len(self.pointList)
        if pointLen == 0:
            self.points = None
        else:
            self.points = np.array(self.pointList)
        self.warping = pointLen == 4

    def addPoint(self, px, py):
        """add a point with the given px,py coordinate
        to the warp points make sure we have a maximum of 4 warpPoints if warppoints are complete when adding reset them
        this allows to support click UIs that need an unwarped image before setting new warp points.
        px,py is irrelevant for reset"""
        if len(self.pointList) >= 4:
            self.pointList = []
        else:
            self.pointList.append([px, py])
        self.updatePoints()

addPoint(px, py)

add a point with the given px,py coordinate to the warp points make sure we have a maximum of 4 warpPoints if warppoints are complete when adding reset them this allows to support click UIs that need an unwarped image before setting new warp points. px,py is irrelevant for reset

Source code in pcwawc/chessimage.py
204
205
206
207
208
209
210
211
212
213
def addPoint(self, px, py):
    """add a point with the given px,py coordinate
    to the warp points make sure we have a maximum of 4 warpPoints if warppoints are complete when adding reset them
    this allows to support click UIs that need an unwarped image before setting new warp points.
    px,py is irrelevant for reset"""
    if len(self.pointList) >= 4:
        self.pointList = []
    else:
        self.pointList.append([px, py])
    self.updatePoints()

rotate(angle)

rotate me by the given angle

Source code in pcwawc/chessimage.py
189
190
191
192
193
def rotate(self, angle):
    """rotate me by the given angle"""
    self.rotation = self.rotation + angle
    if self.rotation >= 360:
        self.rotation = self.rotation % 360

updatePoints()

update my points

Source code in pcwawc/chessimage.py
195
196
197
198
199
200
201
202
def updatePoints(self):
    """update my points"""
    pointLen = len(self.pointList)
    if pointLen == 0:
        self.points = None
    else:
        self.points = np.array(self.pointList)
    self.warping = pointLen == 4

chesstrapezoid

ChessTSquare

a chess square in it's trapezoidal perspective

Source code in pcwawc/chesstrapezoid.py
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
@implementer(ISquare)
class ChessTSquare:
    """a chess square in it's trapezoidal perspective"""

    # relative position and size of original square
    rw = 1 / (ChessTrapezoid.rows)
    rh = 1 / (ChessTrapezoid.cols)

    showDebugChange = []

    def __init__(self, trapez, square):
        """construct me from the given trapez  and square"""
        self.trapez = trapez
        self.changeStats = MinMaxStats()
        self.square = square
        self.an = chess.SQUARE_NAMES[square]
        # rank are rows in Algebraic Notation from 1 to 8
        self.row = ChessTrapezoid.rows - 1 - chess.square_rank(square)
        # files are columns in Algebraic Notation from A to H
        self.col = chess.square_file(square)
        # https://gamedev.stackexchange.com/a/44998/133453
        self.fieldColor = chess.WHITE if (self.col + self.row) % 2 == 1 else chess.BLACK
        self.fieldState = None
        self.piece = None
        self.preMoveImage = None
        self.postMoveImage = None

        self.rPieceRadius = ChessTSquare.rw / ChessTrapezoid.PieceRadiusFactor

        self.rx, self.ry = self.col * ChessTSquare.rw, self.row * ChessTSquare.rh
        self.rcx = self.rx + ChessTSquare.rw * 0.5
        self.rcy = self.ry + ChessTSquare.rh * 0.5
        self.x, self.y = trapez.relativeToTrapezXY(self.rx, self.ry)
        self.setPolygons(
            trapez,
            self.rx,
            self.ry,
            self.rx + ChessTSquare.rw,
            self.ry,
            self.rx + ChessTSquare.rw,
            self.ry + ChessTSquare.rh,
            self.rx,
            self.ry + ChessTSquare.rh,
        )

    def setPolygons(
        self, trapez, rtl_x, rtl_y, rtr_x, rtr_y, rbr_x, rbr_y, rbl_x, rbl_y
    ):
        """set my relative and warped polygons from the given relative corner coordinates from top left via top right, bottom right to bottom left"""
        self.rpolygon = np.array(
            [(rtl_x, rtl_y), (rtr_x, rtr_y), (rbr_x, rbr_y), (rbl_x, rbl_y)]
        )
        self.idealPolygon = (self.rpolygon * trapez.idealSize).astype(np.int32)
        # function to use to calculate polygon
        r2t = trapez.relativeToTrapezXY
        self.polygon = np.array(
            [r2t(rtl_x, rtl_y), r2t(rtr_x, rtr_y), r2t(rbr_x, rbr_y), r2t(rbl_x, rbl_y)]
        )
        self.ipolygon = self.polygon.astype(np.int32)

    def getPolygon(self, transformation):
        if transformation == Transformation.ORIGINAL:
            return self.ipolygon
        elif transformation == Transformation.RELATIVE:
            return self.rpolygon
        elif transformation == Transformation.IDEAL:
            return self.idealPolygon
        else:
            raise Exception("invalid transformation %d for getPolygon", transformation)

    def getFieldState(self):
        piece = self.piece
        if piece is None:
            if self.fieldColor == chess.WHITE:
                return FieldState.WHITE_EMPTY
            else:
                return FieldState.BLACK_EMPTY
        elif piece.color == chess.WHITE:
            if self.fieldColor == chess.WHITE:
                return FieldState.WHITE_WHITE
            else:
                return FieldState.BLACK_WHITE
        else:
            if self.fieldColor == chess.WHITE:
                return FieldState.WHITE_BLACK
            else:
                return FieldState.BLACK_BLACK
        # this can't happen
        return None

    def drawState(self, image, transformation, channels):
        """draw my state onto the given image with the given transformation and number of channels"""
        # default is drawing a single channel mask
        squareImageColor = 64
        pieceImageColor = squareImageColor
        if channels == 3:
            if self.fieldColor == chess.WHITE:
                if FieldState.WHITE_EMPTY in self.trapez.averageColors:
                    squareImageColor = self.trapez.averageColors[
                        FieldState.WHITE_EMPTY
                    ].color
                else:
                    squareImageColor = Color.white
            else:
                if FieldState.BLACK_EMPTY in self.trapez.averageColors:
                    squareImageColor = self.trapez.averageColors[
                        FieldState.BLACK_EMPTY
                    ].color
                else:
                    squareImageColor = Color.black

        if not (channels == 1 and self.piece is not None):
            self.trapez.video.drawPolygon(
                image, self.getPolygon(transformation), squareImageColor
            )

        if self.piece is not None:
            if channels == 3:
                if self.fieldState in self.trapez.averageColors:
                    pieceImageColor = self.trapez.averageColors[self.fieldState].color
                else:
                    pieceImageColor = (
                        Color.darkgrey
                        if self.piece.color == chess.BLACK
                        else Color.lightgrey
                    )
            rcenter = self.rcenter()
            self.trapez.drawRCircle(image, rcenter, self.rPieceRadius, pieceImageColor)

    def rcenter(self):
        rcx = self.rx + ChessTSquare.rw / 2
        rcy = self.ry + ChessTSquare.rh / 2
        return (rcx, rcy)

    def rxy2xy(self, image):
        h, w = image.shape[:2]
        x = int(self.rx * w)
        y = int(self.ry * h)
        dh = h // ChessTrapezoid.rows
        dw = w // ChessTrapezoid.cols
        return h, w, x, y, dh, dw

    def addPreMoveImage(self, image):
        if self.preMoveImage is not None:
            h, w, x, y, dh, dw = self.rxy2xy(image)
            np.copyto(image[y : y + dh, x : x + dw], self.preMoveImage)

    def drawDebug(self, image, color=(255, 255, 255)):
        """draw debug information onto the given image using the given color"""
        symbol = ""
        if self.piece is not None:
            symbol = (
                self.piece.symbol()
            )  # @TODO piece.unicode_symbol() - needs other font!
        squareHint = self.an + " " + symbol
        rcx, rcy = self.rcenter()
        self.trapez.drawRCenteredText(image, squareHint, rcx, rcy, color=color)

    def getSquareImage(self, cbImage):
        """get the image of me within the given image"""
        h, w, x, y, dh, dw = self.rxy2xy(cbImage.image)
        squareImage = cbImage.image[y : y + dh, x : x + dw]
        return squareImage

    def squareChange(self, image, diffImage):
        """check the changes analyzing the difference image of this square"""
        h, w, x, y, dh, dw = self.rxy2xy(image)

        self.squareImage = image[y : y + dh, x : x + dw]
        self.diffImage = diffImage[y : y + dh, x : x + dw]
        diffSum = np.sum(self.diffImage)
        # the value is 64 times lower then the per pixel value
        self.currentChange = SquareChange(diffSum / (h * w), self.changeStats)
        return self.currentChange

    def checkMoved(self, detectState):
        """check a figure has been moved, so that the state of this square has changed"""
        squareChange = self.currentChange
        # if the whole board is valid
        if detectState.validBoard:
            # if we come from an stable invalid period then this is likely a move
            if detectState.invalidStable and self.preMoveImage is not None:
                if not squareChange.valid:
                    self.postMoveImage = self.squareImage
                    if detectState.onPieceMoveDetected is not None:
                        detectState.onPieceMoveDetected(self)
                    self.changeStats.clear()
                    self.preMoveImage = None

            detectState.invalidEnd()
            # add the current change statistics to my statistics
            squareChange.push(self.changeStats, squareChange.value)
            # if we have been valid for a long enough period of time
            if detectState.validStable:
                # remember my image - we are ready to detect a move
                self.preMoveImage = self.squareImage
                pass
        else:
            if detectState.invalidStarted:
                detectState.validEnd()

__init__(trapez, square)

construct me from the given trapez and square

Source code in pcwawc/chesstrapezoid.py
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
def __init__(self, trapez, square):
    """construct me from the given trapez  and square"""
    self.trapez = trapez
    self.changeStats = MinMaxStats()
    self.square = square
    self.an = chess.SQUARE_NAMES[square]
    # rank are rows in Algebraic Notation from 1 to 8
    self.row = ChessTrapezoid.rows - 1 - chess.square_rank(square)
    # files are columns in Algebraic Notation from A to H
    self.col = chess.square_file(square)
    # https://gamedev.stackexchange.com/a/44998/133453
    self.fieldColor = chess.WHITE if (self.col + self.row) % 2 == 1 else chess.BLACK
    self.fieldState = None
    self.piece = None
    self.preMoveImage = None
    self.postMoveImage = None

    self.rPieceRadius = ChessTSquare.rw / ChessTrapezoid.PieceRadiusFactor

    self.rx, self.ry = self.col * ChessTSquare.rw, self.row * ChessTSquare.rh
    self.rcx = self.rx + ChessTSquare.rw * 0.5
    self.rcy = self.ry + ChessTSquare.rh * 0.5
    self.x, self.y = trapez.relativeToTrapezXY(self.rx, self.ry)
    self.setPolygons(
        trapez,
        self.rx,
        self.ry,
        self.rx + ChessTSquare.rw,
        self.ry,
        self.rx + ChessTSquare.rw,
        self.ry + ChessTSquare.rh,
        self.rx,
        self.ry + ChessTSquare.rh,
    )

checkMoved(detectState)

check a figure has been moved, so that the state of this square has changed

Source code in pcwawc/chesstrapezoid.py
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
def checkMoved(self, detectState):
    """check a figure has been moved, so that the state of this square has changed"""
    squareChange = self.currentChange
    # if the whole board is valid
    if detectState.validBoard:
        # if we come from an stable invalid period then this is likely a move
        if detectState.invalidStable and self.preMoveImage is not None:
            if not squareChange.valid:
                self.postMoveImage = self.squareImage
                if detectState.onPieceMoveDetected is not None:
                    detectState.onPieceMoveDetected(self)
                self.changeStats.clear()
                self.preMoveImage = None

        detectState.invalidEnd()
        # add the current change statistics to my statistics
        squareChange.push(self.changeStats, squareChange.value)
        # if we have been valid for a long enough period of time
        if detectState.validStable:
            # remember my image - we are ready to detect a move
            self.preMoveImage = self.squareImage
            pass
    else:
        if detectState.invalidStarted:
            detectState.validEnd()

drawDebug(image, color=(255, 255, 255))

draw debug information onto the given image using the given color

Source code in pcwawc/chesstrapezoid.py
669
670
671
672
673
674
675
676
677
678
def drawDebug(self, image, color=(255, 255, 255)):
    """draw debug information onto the given image using the given color"""
    symbol = ""
    if self.piece is not None:
        symbol = (
            self.piece.symbol()
        )  # @TODO piece.unicode_symbol() - needs other font!
    squareHint = self.an + " " + symbol
    rcx, rcy = self.rcenter()
    self.trapez.drawRCenteredText(image, squareHint, rcx, rcy, color=color)

drawState(image, transformation, channels)

draw my state onto the given image with the given transformation and number of channels

Source code in pcwawc/chesstrapezoid.py
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
def drawState(self, image, transformation, channels):
    """draw my state onto the given image with the given transformation and number of channels"""
    # default is drawing a single channel mask
    squareImageColor = 64
    pieceImageColor = squareImageColor
    if channels == 3:
        if self.fieldColor == chess.WHITE:
            if FieldState.WHITE_EMPTY in self.trapez.averageColors:
                squareImageColor = self.trapez.averageColors[
                    FieldState.WHITE_EMPTY
                ].color
            else:
                squareImageColor = Color.white
        else:
            if FieldState.BLACK_EMPTY in self.trapez.averageColors:
                squareImageColor = self.trapez.averageColors[
                    FieldState.BLACK_EMPTY
                ].color
            else:
                squareImageColor = Color.black

    if not (channels == 1 and self.piece is not None):
        self.trapez.video.drawPolygon(
            image, self.getPolygon(transformation), squareImageColor
        )

    if self.piece is not None:
        if channels == 3:
            if self.fieldState in self.trapez.averageColors:
                pieceImageColor = self.trapez.averageColors[self.fieldState].color
            else:
                pieceImageColor = (
                    Color.darkgrey
                    if self.piece.color == chess.BLACK
                    else Color.lightgrey
                )
        rcenter = self.rcenter()
        self.trapez.drawRCircle(image, rcenter, self.rPieceRadius, pieceImageColor)

getSquareImage(cbImage)

get the image of me within the given image

Source code in pcwawc/chesstrapezoid.py
680
681
682
683
684
def getSquareImage(self, cbImage):
    """get the image of me within the given image"""
    h, w, x, y, dh, dw = self.rxy2xy(cbImage.image)
    squareImage = cbImage.image[y : y + dh, x : x + dw]
    return squareImage

setPolygons(trapez, rtl_x, rtl_y, rtr_x, rtr_y, rbr_x, rbr_y, rbl_x, rbl_y)

set my relative and warped polygons from the given relative corner coordinates from top left via top right, bottom right to bottom left

Source code in pcwawc/chesstrapezoid.py
567
568
569
570
571
572
573
574
575
576
577
578
579
580
def setPolygons(
    self, trapez, rtl_x, rtl_y, rtr_x, rtr_y, rbr_x, rbr_y, rbl_x, rbl_y
):
    """set my relative and warped polygons from the given relative corner coordinates from top left via top right, bottom right to bottom left"""
    self.rpolygon = np.array(
        [(rtl_x, rtl_y), (rtr_x, rtr_y), (rbr_x, rbr_y), (rbl_x, rbl_y)]
    )
    self.idealPolygon = (self.rpolygon * trapez.idealSize).astype(np.int32)
    # function to use to calculate polygon
    r2t = trapez.relativeToTrapezXY
    self.polygon = np.array(
        [r2t(rtl_x, rtl_y), r2t(rtr_x, rtr_y), r2t(rbr_x, rbr_y), r2t(rbl_x, rbl_y)]
    )
    self.ipolygon = self.polygon.astype(np.int32)

squareChange(image, diffImage)

check the changes analyzing the difference image of this square

Source code in pcwawc/chesstrapezoid.py
686
687
688
689
690
691
692
693
694
695
def squareChange(self, image, diffImage):
    """check the changes analyzing the difference image of this square"""
    h, w, x, y, dh, dw = self.rxy2xy(image)

    self.squareImage = image[y : y + dh, x : x + dw]
    self.diffImage = diffImage[y : y + dh, x : x + dw]
    diffSum = np.sum(self.diffImage)
    # the value is 64 times lower then the per pixel value
    self.currentChange = SquareChange(diffSum / (h * w), self.changeStats)
    return self.currentChange

ChessTrapezoid

Bases: Trapez2Square

Chess board Trapezoid (UK) / Trapezium (US) / Trapez (DE) as seen via a webcam image

Source code in pcwawc/chesstrapezoid.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
class ChessTrapezoid(Trapez2Square):
    """Chess board Trapezoid (UK) / Trapezium (US) / Trapez (DE)  as seen via a webcam image"""

    debug = False
    colorDebug = False
    showDebugImage = False
    rows = 8
    cols = 8
    # default radius of pieces
    PieceRadiusFactor = 3
    DiffSumMovingAverageLength = 5

    def __init__(self, trapezPoints, idealSize=640, rotation=0, video=None):
        self.rotation = rotation
        # trapezPoints=[topLeft,topRight,bottomRight,bottomLeft]
        shifts = self.rotation // 90
        for shift in range(shifts):
            left = trapezPoints.pop(0)
            trapezPoints.append(left)
        topLeft, topRight, bottomRight, bottomLeft = trapezPoints
        super().__init__(topLeft, topRight, bottomRight, bottomLeft)
        self.setup(idealSize, video)

    def setup(self, idealSize=640, video=None):
        # video access (for debugging and partly hiding open cv details)
        if video is None:
            self.video = Video()
        self.idealSize = idealSize
        s = idealSize
        self.pts_IdealSquare = np.asarray(
            [[0.0, 0.0], [s, 0.0], [s, s], [0.0, s]], dtype=np.float32
        )
        self.inverseTransform = cv2.getPerspectiveTransform(
            self.pts_dst, self.pts_IdealSquare
        )
        self.rotation = 0
        # dict for average Colors
        self.averageColors = {}
        self.diffSumAverage = MovingAverage(ChessTrapezoid.DiffSumMovingAverageLength)
        # trapezoid representation of squares
        self.tsquares = {}
        for square in chess.SQUARES:
            tsquare = ChessTSquare(self, square)
            if ChessTrapezoid.debug:
                print(vars(tsquare))
            self.tsquares[tsquare.square] = tsquare

    def relativeToIdealXY(self, rx, ry):
        x = int(rx * self.idealSize)
        y = int(ry * self.idealSize)
        return x, y

    def tSquareAt(self, row, col, rotation=0):
        """get the trapezoid chessboard square for the given row and column"""
        row, col = self.rotateIndices(row, col, rotation)
        squareIndex = (ChessTrapezoid.rows - 1 - row) * ChessTrapezoid.cols + col
        square = chess.SQUARES[squareIndex]
        return self.tsquares[square]

    def rotateIndices(self, row, col, rotation):
        """rotate the indices or rows and columns according to the board rotation"""
        if rotation == 0:
            return row, col
        elif rotation == 90:
            return ChessTrapezoid.cols - 1 - col, row
        elif rotation == 180:
            return ChessTrapezoid.rows - 1 - row, ChessTrapezoid.cols - 1 - col
        elif rotation == 270:
            return col, ChessTrapezoid.rows - 1 - row
        else:
            raise Exception("invalid rotation %d for rotateIndices" % rotation)

    def genSquares(self):
        """generator for all chess squares"""
        for square in chess.SQUARES:
            tsquare = self.tsquares[square]
            yield tsquare

    def drawCircle(self, image, center, radius, color, thickness=-1):
        """draw a circle onto the given image at the given center point with the given radius, color and thickness."""
        if color is not None:
            cv2.circle(image, center, radius, color=color, thickness=thickness)

    def drawRCircle(self, image, rcenter, rradius, color, thickness=-1):
        """draw a circle with relative coordinates"""
        radius = int(rradius * self.idealSize)
        rx, ry = rcenter
        center = self.relativeToIdealXY(rx, ry)
        self.drawCircle(image, center, radius, color, thickness)

    def drawRCenteredText(self, image, text, rx, ry, color=(255, 255, 255)):
        x, y = self.relativeToIdealXY(rx, ry)
        self.video.drawCenteredText(image, text, x, y, fontBGRColor=color)

    def updatePieces(self, fen):
        """update the piece positions according to the given FEN"""
        self.board = chess.Board(fen)
        for tsquare in self.genSquares():
            piece = self.board.piece_at(tsquare.square)
            tsquare.piece = piece
            tsquare.fieldState = tsquare.getFieldState()

    def drawFieldStates(
        self, image, fieldStates, transformation=Transformation.ORIGINAL, channels=3
    ):
        """draw the states for fields with the given field states e.g. to set the mask image that will filter the trapezoid view according to piece positions when using maskImage"""
        if self.board is not None:
            for tsquare in self.genSquares():
                if tsquare.fieldState in fieldStates:
                    tsquare.drawState(image, transformation, channels)

    def prepareImageSet(self, cbImageSet):
        """prepare the image set"""
        cbWarped = self.warpedBoardImage(cbImageSet.cbImage.image)
        averageColors = self.analyzeColors(cbWarped)
        cbImageSet.cbWarped = cbWarped
        cbIdeal = self.idealColoredBoard(cbWarped.width, cbWarped.height)
        cbImageSet.cbIdeal = cbIdeal
        cbImageSet.cbPreMove = self.preMoveBoard(cbWarped.width, cbWarped.height)
        cbImageSet.cbDiff = cbWarped.diffBoardImage(cbIdeal)
        return averageColors

    def warpedBoardImage(self, image):
        warped = cv2.warpPerspective(
            image, self.inverseTransform, (self.idealSize, self.idealSize)
        )
        return ChessBoardImage(warped, "warped")

    def diffSum(self, image, other):
        # diffImage=self.diff(other)
        # return diffImage.sum()
        # https://stackoverflow.com/questions/17829092/opencv-cv2-absdiffimg1-img2-sum-without-temporary-img
        diffSumValue = cv2.norm(image, other, cv2.NORM_L1)
        if ChessTrapezoid.debug:
            print("diffSum %.0f" % (diffSumValue))
        return diffSumValue

    def idealColoredBoard(self, w, h, transformation=Transformation.IDEAL):
        """draw an 'ideal' colored board according to a given set of parameters e.g. fieldColor, pieceColor, pieceRadius"""
        idealImage = self.video.getEmptyImage4WidthAndHeight(w, h, 3)
        for tsquare in self.genSquares():
            tsquare.drawState(idealImage, transformation, 3)
        return ChessBoardImage(idealImage, "ideal")

    def preMoveBoard(self, w, h):
        """get an image of the board as it was before any move"""
        refImage = self.video.getEmptyImage4WidthAndHeight(w, h, 3)
        for tsquare in self.genSquares():
            tsquare.addPreMoveImage(refImage)
        return ChessBoardImage(refImage, "preMove ref")

    def drawDebug(self, image, color=(255, 255, 255)):
        """draw debug information e.g. piecel symbol and an onto the given image"""
        for square in chess.SQUARES:
            tsquare = self.tsquares[square]
            tsquare.drawDebug(image, color)

    def byFieldState(self):
        # get a dict of fields sorted by field state
        sortedTSquares = {}
        for fieldState in FieldState:
            sortedTSquares[fieldState] = []
        for tsquare in self.genSquares():
            sortedTSquares[tsquare.fieldState].append(tsquare)
        return sortedTSquares

    def analyzeColors(self, cbImage):
        """get the average colors per fieldState"""
        byFieldState = self.byFieldState()
        for fieldState in byFieldState.keys():
            mask = self.video.getEmptyImage(cbImage.image)
            self.drawFieldStates(mask, [fieldState], Transformation.IDEAL, 1)
            masked = self.video.maskImage(cbImage.image, mask)
            countedFields = len(byFieldState[fieldState])
            averageColor = Color(masked)
            self.averageColors[fieldState] = averageColor
            if ChessTrapezoid.showDebugImage:
                self.video.showImage(masked, fieldState.title())
            if ChessTrapezoid.colorDebug:
                print(
                    "%15s (%2d): %s" % (fieldState.title(), countedFields, averageColor)
                )
        return self.averageColors

    def optimizeColorCheck(self, cbImage, averageColors, debug=False):
        optimalSelectivity = -100
        colorStats = None
        for factor in [x * 0.05 for x in range(20, 41)]:
            """optimize the factor for the color check"""
            startc = timer()
            fieldColorStatsCandidate = self.checkColors(cbImage, averageColors, factor)
            endc = timer()
            fieldColorStatsCandidate.analyzeStats(factor, endc - startc)
            if fieldColorStatsCandidate.minSelectivity > optimalSelectivity:
                optimalSelectivity = fieldColorStatsCandidate.minSelectivity
                colorStats = fieldColorStatsCandidate
                if debug:
                    print(
                        "selectivity %5.1f white: %5.1f black: %5.1f "
                        % (
                            self.minSelectivity,
                            self.whiteSelectivity,
                            self.blackSelectivity,
                        )
                    )
        return colorStats

    def checkColors(self, cbImage, averageColors, rangeFactor=1.0):
        """check the colors against the expectation"""
        byFieldState = self.byFieldState()
        colorStats = FieldColorStats()
        for fieldState in byFieldState.keys():
            # https://stackoverflow.com/questions/54019108/how-to-count-the-pixels-of-a-certain-color-with-opencv
            if fieldState in [
                FieldState.WHITE_BLACK,
                FieldState.WHITE_EMPTY,
                FieldState.WHITE_WHITE,
            ]:
                averageColor = averageColors[FieldState.WHITE_EMPTY]
            else:
                averageColor = averageColors[FieldState.BLACK_EMPTY]
            fields = byFieldState[fieldState]
            lower, upper = averageColor.colorRange(rangeFactor)
            if ChessTrapezoid.colorDebug:
                print(
                    "%25s (%2d): %s -> %s - %s"
                    % (fieldState.title(), len(fields), averageColor, lower, upper)
                )
            for tsquare in fields:
                squareImage = tsquare.getSquareImage(cbImage)
                asExpected = cv2.inRange(squareImage, lower, upper)
                h, w = squareImage.shape[:2]
                pixels = h * w
                nonzero = cv2.countNonZero(asExpected)
                colorStats.push(fieldState, tsquare.an, nonzero / pixels * 100)
                # self.video.showImage(asExpected,tsquare.an)
        return colorStats

    def detectChanges(self, cbImageSet, detectState):
        """detect the changes of the given imageset using the given detect state machine"""
        detectState.nextFrame()
        changes = {}
        validChanges = 0
        diffSum = 0
        cbImage = cbImageSet.cbImage
        cbDiff = cbImageSet.cbDiff
        for tsquare in self.genSquares():
            squareChange = tsquare.squareChange(cbImage.image, cbDiff.image)
            changes[tsquare.an] = squareChange
            diffSum += abs(squareChange.diff)
            if squareChange.valid:
                validChanges += 1
            # if self.frames==1:
            #    tsquare.preMoveImage=np.copy(tsquare.squareImage)

        self.diffSumAverage.push(diffSum)
        diffSumDelta = self.diffSumAverage.mean() - diffSum
        detectState.check(
            validChanges, diffSum, diffSumDelta, squareChange.meanFrameCount
        )
        for tsquare in self.genSquares():
            squareChange = changes[tsquare.an]
            tsquare.checkMoved(detectState)

        changes["validBoard"] = detectState.validBoard
        changes["valid"] = validChanges
        changes["diffSum"] = diffSum
        changes["diffSumDelta"] = diffSumDelta
        changes["validFrames"] = detectState.validFrames
        changes["invalidFrames"] = detectState.invalidFrames
        return changes

analyzeColors(cbImage)

get the average colors per fieldState

Source code in pcwawc/chesstrapezoid.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
def analyzeColors(self, cbImage):
    """get the average colors per fieldState"""
    byFieldState = self.byFieldState()
    for fieldState in byFieldState.keys():
        mask = self.video.getEmptyImage(cbImage.image)
        self.drawFieldStates(mask, [fieldState], Transformation.IDEAL, 1)
        masked = self.video.maskImage(cbImage.image, mask)
        countedFields = len(byFieldState[fieldState])
        averageColor = Color(masked)
        self.averageColors[fieldState] = averageColor
        if ChessTrapezoid.showDebugImage:
            self.video.showImage(masked, fieldState.title())
        if ChessTrapezoid.colorDebug:
            print(
                "%15s (%2d): %s" % (fieldState.title(), countedFields, averageColor)
            )
    return self.averageColors

checkColors(cbImage, averageColors, rangeFactor=1.0)

check the colors against the expectation

Source code in pcwawc/chesstrapezoid.py
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
def checkColors(self, cbImage, averageColors, rangeFactor=1.0):
    """check the colors against the expectation"""
    byFieldState = self.byFieldState()
    colorStats = FieldColorStats()
    for fieldState in byFieldState.keys():
        # https://stackoverflow.com/questions/54019108/how-to-count-the-pixels-of-a-certain-color-with-opencv
        if fieldState in [
            FieldState.WHITE_BLACK,
            FieldState.WHITE_EMPTY,
            FieldState.WHITE_WHITE,
        ]:
            averageColor = averageColors[FieldState.WHITE_EMPTY]
        else:
            averageColor = averageColors[FieldState.BLACK_EMPTY]
        fields = byFieldState[fieldState]
        lower, upper = averageColor.colorRange(rangeFactor)
        if ChessTrapezoid.colorDebug:
            print(
                "%25s (%2d): %s -> %s - %s"
                % (fieldState.title(), len(fields), averageColor, lower, upper)
            )
        for tsquare in fields:
            squareImage = tsquare.getSquareImage(cbImage)
            asExpected = cv2.inRange(squareImage, lower, upper)
            h, w = squareImage.shape[:2]
            pixels = h * w
            nonzero = cv2.countNonZero(asExpected)
            colorStats.push(fieldState, tsquare.an, nonzero / pixels * 100)
            # self.video.showImage(asExpected,tsquare.an)
    return colorStats

detectChanges(cbImageSet, detectState)

detect the changes of the given imageset using the given detect state machine

Source code in pcwawc/chesstrapezoid.py
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
def detectChanges(self, cbImageSet, detectState):
    """detect the changes of the given imageset using the given detect state machine"""
    detectState.nextFrame()
    changes = {}
    validChanges = 0
    diffSum = 0
    cbImage = cbImageSet.cbImage
    cbDiff = cbImageSet.cbDiff
    for tsquare in self.genSquares():
        squareChange = tsquare.squareChange(cbImage.image, cbDiff.image)
        changes[tsquare.an] = squareChange
        diffSum += abs(squareChange.diff)
        if squareChange.valid:
            validChanges += 1
        # if self.frames==1:
        #    tsquare.preMoveImage=np.copy(tsquare.squareImage)

    self.diffSumAverage.push(diffSum)
    diffSumDelta = self.diffSumAverage.mean() - diffSum
    detectState.check(
        validChanges, diffSum, diffSumDelta, squareChange.meanFrameCount
    )
    for tsquare in self.genSquares():
        squareChange = changes[tsquare.an]
        tsquare.checkMoved(detectState)

    changes["validBoard"] = detectState.validBoard
    changes["valid"] = validChanges
    changes["diffSum"] = diffSum
    changes["diffSumDelta"] = diffSumDelta
    changes["validFrames"] = detectState.validFrames
    changes["invalidFrames"] = detectState.invalidFrames
    return changes

drawCircle(image, center, radius, color, thickness=-1)

draw a circle onto the given image at the given center point with the given radius, color and thickness.

Source code in pcwawc/chesstrapezoid.py
159
160
161
162
def drawCircle(self, image, center, radius, color, thickness=-1):
    """draw a circle onto the given image at the given center point with the given radius, color and thickness."""
    if color is not None:
        cv2.circle(image, center, radius, color=color, thickness=thickness)

drawDebug(image, color=(255, 255, 255))

draw debug information e.g. piecel symbol and an onto the given image

Source code in pcwawc/chesstrapezoid.py
232
233
234
235
236
def drawDebug(self, image, color=(255, 255, 255)):
    """draw debug information e.g. piecel symbol and an onto the given image"""
    for square in chess.SQUARES:
        tsquare = self.tsquares[square]
        tsquare.drawDebug(image, color)

drawFieldStates(image, fieldStates, transformation=Transformation.ORIGINAL, channels=3)

draw the states for fields with the given field states e.g. to set the mask image that will filter the trapezoid view according to piece positions when using maskImage

Source code in pcwawc/chesstrapezoid.py
183
184
185
186
187
188
189
190
def drawFieldStates(
    self, image, fieldStates, transformation=Transformation.ORIGINAL, channels=3
):
    """draw the states for fields with the given field states e.g. to set the mask image that will filter the trapezoid view according to piece positions when using maskImage"""
    if self.board is not None:
        for tsquare in self.genSquares():
            if tsquare.fieldState in fieldStates:
                tsquare.drawState(image, transformation, channels)

drawRCircle(image, rcenter, rradius, color, thickness=-1)

draw a circle with relative coordinates

Source code in pcwawc/chesstrapezoid.py
164
165
166
167
168
169
def drawRCircle(self, image, rcenter, rradius, color, thickness=-1):
    """draw a circle with relative coordinates"""
    radius = int(rradius * self.idealSize)
    rx, ry = rcenter
    center = self.relativeToIdealXY(rx, ry)
    self.drawCircle(image, center, radius, color, thickness)

genSquares()

generator for all chess squares

Source code in pcwawc/chesstrapezoid.py
153
154
155
156
157
def genSquares(self):
    """generator for all chess squares"""
    for square in chess.SQUARES:
        tsquare = self.tsquares[square]
        yield tsquare

idealColoredBoard(w, h, transformation=Transformation.IDEAL)

draw an 'ideal' colored board according to a given set of parameters e.g. fieldColor, pieceColor, pieceRadius

Source code in pcwawc/chesstrapezoid.py
218
219
220
221
222
223
def idealColoredBoard(self, w, h, transformation=Transformation.IDEAL):
    """draw an 'ideal' colored board according to a given set of parameters e.g. fieldColor, pieceColor, pieceRadius"""
    idealImage = self.video.getEmptyImage4WidthAndHeight(w, h, 3)
    for tsquare in self.genSquares():
        tsquare.drawState(idealImage, transformation, 3)
    return ChessBoardImage(idealImage, "ideal")

preMoveBoard(w, h)

get an image of the board as it was before any move

Source code in pcwawc/chesstrapezoid.py
225
226
227
228
229
230
def preMoveBoard(self, w, h):
    """get an image of the board as it was before any move"""
    refImage = self.video.getEmptyImage4WidthAndHeight(w, h, 3)
    for tsquare in self.genSquares():
        tsquare.addPreMoveImage(refImage)
    return ChessBoardImage(refImage, "preMove ref")

prepareImageSet(cbImageSet)

prepare the image set

Source code in pcwawc/chesstrapezoid.py
192
193
194
195
196
197
198
199
200
201
def prepareImageSet(self, cbImageSet):
    """prepare the image set"""
    cbWarped = self.warpedBoardImage(cbImageSet.cbImage.image)
    averageColors = self.analyzeColors(cbWarped)
    cbImageSet.cbWarped = cbWarped
    cbIdeal = self.idealColoredBoard(cbWarped.width, cbWarped.height)
    cbImageSet.cbIdeal = cbIdeal
    cbImageSet.cbPreMove = self.preMoveBoard(cbWarped.width, cbWarped.height)
    cbImageSet.cbDiff = cbWarped.diffBoardImage(cbIdeal)
    return averageColors

rotateIndices(row, col, rotation)

rotate the indices or rows and columns according to the board rotation

Source code in pcwawc/chesstrapezoid.py
140
141
142
143
144
145
146
147
148
149
150
151
def rotateIndices(self, row, col, rotation):
    """rotate the indices or rows and columns according to the board rotation"""
    if rotation == 0:
        return row, col
    elif rotation == 90:
        return ChessTrapezoid.cols - 1 - col, row
    elif rotation == 180:
        return ChessTrapezoid.rows - 1 - row, ChessTrapezoid.cols - 1 - col
    elif rotation == 270:
        return col, ChessTrapezoid.rows - 1 - row
    else:
        raise Exception("invalid rotation %d for rotateIndices" % rotation)

tSquareAt(row, col, rotation=0)

get the trapezoid chessboard square for the given row and column

Source code in pcwawc/chesstrapezoid.py
133
134
135
136
137
138
def tSquareAt(self, row, col, rotation=0):
    """get the trapezoid chessboard square for the given row and column"""
    row, col = self.rotateIndices(row, col, rotation)
    squareIndex = (ChessTrapezoid.rows - 1 - row) * ChessTrapezoid.cols + col
    square = chess.SQUARES[squareIndex]
    return self.tsquares[square]

updatePieces(fen)

update the piece positions according to the given FEN

Source code in pcwawc/chesstrapezoid.py
175
176
177
178
179
180
181
def updatePieces(self, fen):
    """update the piece positions according to the given FEN"""
    self.board = chess.Board(fen)
    for tsquare in self.genSquares():
        piece = self.board.piece_at(tsquare.square)
        tsquare.piece = piece
        tsquare.fieldState = tsquare.getFieldState()

Color

Color definitions with maximum lightness difference and calculation of average color for a sample of square with a given fieldState

Source code in pcwawc/chesstrapezoid.py
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
class Color:
    """Color definitions with maximum lightness difference and calculation of average color for a sample of square with a given fieldState"""

    white = (255, 255, 255)
    lightgrey = (170, 170, 170)
    darkgrey = (85, 85, 85)
    black = (0, 0, 0)
    debug = False

    def __init__(self, image):
        """pick the average color from the given image"""
        # https://stackoverflow.com/a/43112217/1497139
        (means, stds) = cv2.meanStdDev(image)
        pixels, nonzero = Color.countNonZero(image)
        # exotic case of a totally black picture
        if nonzero == 0:
            self.color = (0, 0, 0)
            self.stds = (0, 0, 0)
        else:
            self.color, self.stds = self.fixMeans(means, stds, pixels, nonzero)

    @staticmethod
    def countNonZero(image):
        # https://stackoverflow.com/a/55163686/1497139
        b = image[:, :, 0]
        g = image[:, :, 1]
        r = image[:, :, 2]
        h, w = image.shape[:2]
        pixels = h * w
        nonzerotupel = cv2.countNonZero(b), cv2.countNonZero(g), cv2.countNonZero(r)
        nonzero = max(nonzerotupel)
        return pixels, nonzero

    def __str__(self):
        b, g, r = self.color
        bs, gs, rs = self.stds
        s = "%3d, %3d, %3d ± %3d, %3d, %3d " % (b, g, r, bs, gs, rs)
        return s

    def fix(self, value):
        return 0 if value < 0 else 255 if value > 255 else value

    def colorRange(self, rangeFactor):
        b, g, r = self.color
        bs, gs, rs = self.stds
        rf = rangeFactor
        lower = np.array(
            [self.fix(b - bs * rf), self.fix(g - gs * rf), self.fix(r - rs * rf)],
            dtype="uint8",
        )
        upper = np.array(
            [self.fix(b + bs * rf), self.fix(g + gs * rf), self.fix(r + rs * rf)],
            dtype="uint8",
        )
        return lower, upper

    def fixMeans(self, means, stds, pixels, nonzero):
        """fix the zero based means to nonzero based see https://stackoverflow.com/a/58891531/1497139"""
        gmean, bmean, rmean = means.flatten()
        gstds, bstds, rstds = stds.flatten()
        if Color.debug:
            print("means %.2f %.2f %.2f " % (gmean, bmean, rmean))
            print("stds  %.2f %.2f %.2f " % (gstds, bstds, rstds))
        factor = pixels / nonzero
        fgmean = gmean * factor
        fbmean = bmean * factor
        frmean = rmean * factor
        if Color.debug:
            print("non-zero means %.2f %.2f %.2f" % (fgmean, fbmean, frmean))
        fsqsumb = (bstds * bstds + bmean * bmean) * pixels
        fsqsumg = (gstds * gstds + gmean * gmean) * pixels
        fsqsumr = (rstds * rstds + rmean * rmean) * pixels
        if Color.debug:
            print("fsqsum %.2f %.2f %.2f" % (fsqsumb, fsqsumg, fsqsumr))
        fstdsb = math.sqrt(max(fsqsumb / nonzero - fbmean * fbmean, 0))
        fstdsg = math.sqrt(max(fsqsumg / nonzero - fgmean * fgmean, 0))
        fstdsr = math.sqrt(max(fsqsumr / nonzero - frmean * frmean, 0))
        if Color.debug:
            print("non-zero stds %.2f %.2f %.2f" % (fstdsb, fstdsg, fstdsr))
        fixedmeans = fgmean, fbmean, frmean
        fixedstds = fstdsb, fstdsg, fstdsr
        return fixedmeans, fixedstds

__init__(image)

pick the average color from the given image

Source code in pcwawc/chesstrapezoid.py
423
424
425
426
427
428
429
430
431
432
433
def __init__(self, image):
    """pick the average color from the given image"""
    # https://stackoverflow.com/a/43112217/1497139
    (means, stds) = cv2.meanStdDev(image)
    pixels, nonzero = Color.countNonZero(image)
    # exotic case of a totally black picture
    if nonzero == 0:
        self.color = (0, 0, 0)
        self.stds = (0, 0, 0)
    else:
        self.color, self.stds = self.fixMeans(means, stds, pixels, nonzero)

fixMeans(means, stds, pixels, nonzero)

fix the zero based means to nonzero based see https://stackoverflow.com/a/58891531/1497139

Source code in pcwawc/chesstrapezoid.py
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
def fixMeans(self, means, stds, pixels, nonzero):
    """fix the zero based means to nonzero based see https://stackoverflow.com/a/58891531/1497139"""
    gmean, bmean, rmean = means.flatten()
    gstds, bstds, rstds = stds.flatten()
    if Color.debug:
        print("means %.2f %.2f %.2f " % (gmean, bmean, rmean))
        print("stds  %.2f %.2f %.2f " % (gstds, bstds, rstds))
    factor = pixels / nonzero
    fgmean = gmean * factor
    fbmean = bmean * factor
    frmean = rmean * factor
    if Color.debug:
        print("non-zero means %.2f %.2f %.2f" % (fgmean, fbmean, frmean))
    fsqsumb = (bstds * bstds + bmean * bmean) * pixels
    fsqsumg = (gstds * gstds + gmean * gmean) * pixels
    fsqsumr = (rstds * rstds + rmean * rmean) * pixels
    if Color.debug:
        print("fsqsum %.2f %.2f %.2f" % (fsqsumb, fsqsumg, fsqsumr))
    fstdsb = math.sqrt(max(fsqsumb / nonzero - fbmean * fbmean, 0))
    fstdsg = math.sqrt(max(fsqsumg / nonzero - fgmean * fgmean, 0))
    fstdsr = math.sqrt(max(fsqsumr / nonzero - frmean * frmean, 0))
    if Color.debug:
        print("non-zero stds %.2f %.2f %.2f" % (fstdsb, fstdsg, fstdsr))
    fixedmeans = fgmean, fbmean, frmean
    fixedstds = fstdsb, fstdsg, fstdsr
    return fixedmeans, fixedstds

FieldColorStats

Bases: object

Color statistics for Fields

Source code in pcwawc/chesstrapezoid.py
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
class FieldColorStats(object):
    """Color statistics for Fields"""

    def __init__(self):
        self.stats = {}
        self.colorPercent = {}
        for fieldState in FieldState:
            self.stats[fieldState] = MinMaxStats()

    def push(self, fieldState, an, percent):
        self.colorPercent[an] = percent
        self.stats[fieldState].push(percent)

    def analyzeStats(self, factor, time, debug=False):
        self.factor = factor
        self.whiteEmptyMin = self.stats[FieldState.WHITE_EMPTY].min
        self.whiteFilledMax = max(
            self.stats[FieldState.WHITE_BLACK].max,
            self.stats[FieldState.WHITE_WHITE].max,
        )
        self.whiteSelectivity = self.whiteEmptyMin - self.whiteFilledMax
        self.blackEmptyMin = self.stats[FieldState.BLACK_EMPTY].min
        self.blackFilledMax = max(
            self.stats[FieldState.BLACK_BLACK].max,
            self.stats[FieldState.BLACK_WHITE].max,
        )
        self.blackSelectivity = self.blackEmptyMin - self.blackFilledMax
        self.minSelectivity = min(self.whiteSelectivity, self.blackSelectivity)
        if debug:
            self.showDebug(time)

    def showDebug(self, time):
        print("color check %.3f s with factor %.2f" % ((time), self.factor))
        for fieldState in FieldState:
            self.showFieldStateDebug(fieldState)

    def showFieldStateDebug(self, fieldState):
        print(
            "%20s: %s"
            % (
                fieldState.title(),
                self.stats[fieldState].formatMinMax(
                    formatR="%2d: %4.1f ± %4.1f", formatM=" %4.1f - %4.1f"
                ),
            )
        )

    def showStatsDebug(self, time):
        print(
            "%.3fs for color check optimization factor: %5.1f selectivity min %5.1f,white: %5.1f black: %5.1f"
            % (
                time,
                self.factor,
                self.minSelectivity,
                self.whiteSelectivity,
                self.blackSelectivity,
            )
        )

SquareChange

keep track of changes of a square over time

Source code in pcwawc/chesstrapezoid.py
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
class SquareChange:
    """keep track of changes of a square over time"""

    meanFrameCount = 10
    treshold = 0.2

    def __init__(self, value, stats):
        """construct me from the given value with the given running stats"""
        self.value = value
        self.mean = stats.mean()
        self.diff = value - self.mean
        self.variance = stats.variance()
        if stats.n < SquareChange.meanFrameCount:
            stats.push(value)
            self.valid = False
            self.diff = 0
        else:
            self.valid = abs(self.diff) < SquareChange.treshold

    def push(self, stats, value):
        if self.valid:
            stats.push(value)

__init__(value, stats)

construct me from the given value with the given running stats

Source code in pcwawc/chesstrapezoid.py
504
505
506
507
508
509
510
511
512
513
514
515
def __init__(self, value, stats):
    """construct me from the given value with the given running stats"""
    self.value = value
    self.mean = stats.mean()
    self.diff = value - self.mean
    self.variance = stats.variance()
    if stats.n < SquareChange.meanFrameCount:
        stats.push(value)
        self.valid = False
        self.diff = 0
    else:
        self.valid = abs(self.diff) < SquareChange.treshold

Transformation

Bases: IntEnum

Transformation kind

Source code in pcwawc/chesstrapezoid.py
24
25
26
27
28
29
class Transformation(IntEnum):
    """Transformation kind"""

    RELATIVE = 0  # 1.0 x 1.0
    IDEAL = 1  # e.g. 640x640
    ORIGINAL = 2  # whatever the image size is

Trapez2Square

transform a trapez to a square and back as needed

Source code in pcwawc/chesstrapezoid.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class Trapez2Square:
    """transform a trapez to a square and back as needed"""

    def __init__(self, topLeft, topRight, bottomRight, bottomLeft):
        """construct me from the given corner points"""
        self.tl, self.tr, self.br, self.bl = topLeft, topRight, bottomRight, bottomLeft
        self.polygon = np.array(
            [topLeft, topRight, bottomRight, bottomLeft], dtype=np.int32
        )
        # prepare the perspective transformation
        # https://stackoverflow.com/questions/27585355/python-open-cv-perspectivetransform
        # https://stackoverflow.com/a/41768610/1497139
        # the destination
        self.pts_dst = np.asarray(
            [topLeft, topRight, bottomRight, bottomLeft], dtype=np.float32
        )
        # the normed square described as a polygon in clockwise direction with an origin at top left
        self.pts_normedSquare = np.asarray(
            [[0.0, 0.0], [1.0, 0.0], [1.0, 1.0], [0.0, 1.0]], dtype=np.float32
        )
        self.transform = cv2.getPerspectiveTransform(
            self.pts_normedSquare, self.pts_dst
        )

    def relativeToTrapezXY(self, rx, ry):
        """convert a relative 0-1 based coordinate to a coordinate in the trapez"""
        # see https://math.stackexchange.com/questions/2084647/obtain-two-dimensional-linear-space-on-trapezoid-shape
        # https://stackoverflow.com/a/33303869/1497139
        rxry = np.asarray([[rx, ry]], dtype=np.float32)
        # target array - values are irrelevant because the will be overridden
        xya = cv2.perspectiveTransform(np.array([rxry]), self.transform)
        # example result:
        # ndarray: [[[20. 40.]]]
        xy = xya[0][0]
        x, y = xy[0], xy[1]
        return x, y

    def relativeTrapezToTrapezXY(self, rx1, ry1, rx2, ry2):
        return np.asarray(
            [
                self.relativeToTrapezXY(rx1, ry1),
                self.relativeToTrapezXY(rx2, ry1),
                self.relativeToTrapezXY(rx2, ry2),
                self.relativeToTrapezXY(rx1, ry2),
            ],
            dtype=np.int32,
        )

__init__(topLeft, topRight, bottomRight, bottomLeft)

construct me from the given corner points

Source code in pcwawc/chesstrapezoid.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def __init__(self, topLeft, topRight, bottomRight, bottomLeft):
    """construct me from the given corner points"""
    self.tl, self.tr, self.br, self.bl = topLeft, topRight, bottomRight, bottomLeft
    self.polygon = np.array(
        [topLeft, topRight, bottomRight, bottomLeft], dtype=np.int32
    )
    # prepare the perspective transformation
    # https://stackoverflow.com/questions/27585355/python-open-cv-perspectivetransform
    # https://stackoverflow.com/a/41768610/1497139
    # the destination
    self.pts_dst = np.asarray(
        [topLeft, topRight, bottomRight, bottomLeft], dtype=np.float32
    )
    # the normed square described as a polygon in clockwise direction with an origin at top left
    self.pts_normedSquare = np.asarray(
        [[0.0, 0.0], [1.0, 0.0], [1.0, 1.0], [0.0, 1.0]], dtype=np.float32
    )
    self.transform = cv2.getPerspectiveTransform(
        self.pts_normedSquare, self.pts_dst
    )

relativeToTrapezXY(rx, ry)

convert a relative 0-1 based coordinate to a coordinate in the trapez

Source code in pcwawc/chesstrapezoid.py
56
57
58
59
60
61
62
63
64
65
66
67
def relativeToTrapezXY(self, rx, ry):
    """convert a relative 0-1 based coordinate to a coordinate in the trapez"""
    # see https://math.stackexchange.com/questions/2084647/obtain-two-dimensional-linear-space-on-trapezoid-shape
    # https://stackoverflow.com/a/33303869/1497139
    rxry = np.asarray([[rx, ry]], dtype=np.float32)
    # target array - values are irrelevant because the will be overridden
    xya = cv2.perspectiveTransform(np.array([rxry]), self.transform)
    # example result:
    # ndarray: [[[20. 40.]]]
    xy = xya[0][0]
    x, y = xy[0], xy[1]
    return x, y

chessvision

Created on 2019-10-10

@author: wf see e.g. https://www.fide.com/FIDE/handbook/LawsOfChess.pdf

FieldState

Bases: IntEnum

the state of a field is a combination of the field color with a piece color + two empty field color options

Source code in pcwawc/chessvision.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class FieldState(IntEnum):
    """the state of a field is a combination of the field color with a piece color + two empty field color options"""

    WHITE_EMPTY = 0
    WHITE_WHITE = 1
    WHITE_BLACK = 2
    BLACK_EMPTY = 3
    BLACK_WHITE = 4
    BLACK_BLACK = 5

    def title(
        self,
        titles=[
            "white empty",
            "white on white",
            "black on white",
            "black empty",
            "white on black",
            "black on black",
        ],
    ):
        return titles[self]

IChessBoard

Bases: Interface

chessboard

Source code in pcwawc/chessvision.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
class IChessBoard(Interface):
    """chessboard"""

    fen = Attribute("Forsyth–Edwards Notation")

    def divideInSquares(self, width, height):
        """divide the board in squares based on the given width and height"""
        pass

    def genSquares(self):
        """generate all my squares"""
        pass

    def updatePieces(self, fen):
        """update the piece positions according to the given FEN"""
        pass

divideInSquares(width, height)

divide the board in squares based on the given width and height

Source code in pcwawc/chessvision.py
61
62
63
def divideInSquares(self, width, height):
    """divide the board in squares based on the given width and height"""
    pass

genSquares()

generate all my squares

Source code in pcwawc/chessvision.py
65
66
67
def genSquares(self):
    """generate all my squares"""
    pass

updatePieces(fen)

update the piece positions according to the given FEN

Source code in pcwawc/chessvision.py
69
70
71
def updatePieces(self, fen):
    """update the piece positions according to the given FEN"""
    pass

IChessBoardImage

Bases: Interface

a single image of a chessboard

Source code in pcwawc/chessvision.py
148
149
150
151
152
153
154
155
156
157
158
159
class IChessBoardImage(Interface):
    """a single image of a chessboard"""

    image = Attribute("the chessboard image")
    width = Attribute("width of the image")
    height = Attribute("height of the image")
    pixels = Attribute("number of pixels of image = widthxheight")
    title = Attribute("title of the image")

    def diffBoardImage(self, cbOther):
        """get the difference image between me an the other chessboard image"""
        pass

diffBoardImage(cbOther)

get the difference image between me an the other chessboard image

Source code in pcwawc/chessvision.py
157
158
159
def diffBoardImage(self, cbOther):
    """get the difference image between me an the other chessboard image"""
    pass

IChessBoardImageSet

Bases: Interface

a set of Images

Source code in pcwawc/chessvision.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
class IChessBoardImageSet(Interface):
    """a set of Images"""

    frameIndex = Attribute("index of image in a sequence")
    timeStamp = Attribute("time")
    cbImage = Attribute("original chessboard image")
    cbGUI = Attribute("the chessboard image to be displayed in the GUI")
    cbWarped = Attribute("chessboard image warped to square size")
    cbIdeal = Attribute("ideal chessboard image constructed from parameters")
    cbPreMove = Attribute("chessboard image before move")
    cbDiff = Attribute("chessboard image difference to premove state")

    def warpAndRotate(self):
        """warp and rotate the original image"""

    def prepareGUI(self):
        """prepare the gui output e.g. for debugging"""
        pass

prepareGUI()

prepare the gui output e.g. for debugging

Source code in pcwawc/chessvision.py
143
144
145
def prepareGUI(self):
    """prepare the gui output e.g. for debugging"""
    pass

warpAndRotate()

warp and rotate the original image

Source code in pcwawc/chessvision.py
140
141
def warpAndRotate(self):
    """warp and rotate the original image"""

IChessBoardVision

Bases: Interface

Source code in pcwawc/chessvision.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
class IChessBoardVision(Interface):
    title = Attribute("name/title of the chessboard observation")
    warp = Attribute("the trapzeoid coordinates for warping to a square image")
    args = Attribute("the command line arguments/settings")
    debug = Attribute("true for debugging")
    """ visual observer of a chessboard e.g. with a camera - a video or still image"""

    def open(self, device):
        """open the access to the chessboard images via the given device e.g. device number or filepath"""
        pass

    def readChessBoardImage(self):
        """read a chessboard Image and return it"""
        pass

    def close(self):
        """close the access to the chessboard images"""
        pass

debug = Attribute('true for debugging') class-attribute instance-attribute

visual observer of a chessboard e.g. with a camera - a video or still image

close()

close the access to the chessboard images

Source code in pcwawc/chessvision.py
108
109
110
def close(self):
    """close the access to the chessboard images"""
    pass

open(device)

open the access to the chessboard images via the given device e.g. device number or filepath

Source code in pcwawc/chessvision.py
100
101
102
def open(self, device):
    """open the access to the chessboard images via the given device e.g. device number or filepath"""
    pass

readChessBoardImage()

read a chessboard Image and return it

Source code in pcwawc/chessvision.py
104
105
106
def readChessBoardImage(self):
    """read a chessboard Image and return it"""
    pass

IGame

Bases: Interface

a chess game

Source code in pcwawc/chessvision.py
50
51
52
53
class IGame(Interface):
    """a chess game"""

    pgn = Attribute("Portable game notation")

IMoveDetector

Bases: Interface

a detector for moves on a chessboard image

Source code in pcwawc/chessvision.py
162
163
164
165
166
167
168
169
170
171
172
173
class IMoveDetector(Interface):
    """a detector for moves on a chessboard image"""

    name = Attribute("name of the detector")
    debug = Attribute("true for debugging")

    def setup(self, name, board, video, args):
        """setup the detector with the given board, video and arguments"""

    def onChessBoardImage(self, imageEvent):
        """event handler for image events"""
        pass

onChessBoardImage(imageEvent)

event handler for image events

Source code in pcwawc/chessvision.py
171
172
173
def onChessBoardImage(self, imageEvent):
    """event handler for image events"""
    pass

setup(name, board, video, args)

setup the detector with the given board, video and arguments

Source code in pcwawc/chessvision.py
168
169
def setup(self, name, board, video, args):
    """setup the detector with the given board, video and arguments"""

IPiece

Bases: Interface

a chess piece King,Queen,Bishop,Knight,Rook or Pawn

Source code in pcwawc/chessvision.py
87
88
89
90
class IPiece(Interface):
    """a chess piece King,Queen,Bishop,Knight,Rook or Pawn"""

    color = Attribute("color of the piece")

ISquare

Bases: Interface

one of the 64 square fields of a chessboard

Source code in pcwawc/chessvision.py
74
75
76
77
78
79
80
81
82
83
84
class ISquare(Interface):
    """one of the 64 square fields of a chessboard"""

    board = Attribute("the chessboard this square belongs to")
    an = Attribute("algebraic notation")
    fieldColor = Attribute("color of the empty square")
    piece = Attribute("chess piece currently on the square - may be None")

    def getSquareImage(self, cbImage):
        """get the 1/64 subimage of this square for the given chessboard image"""
        pass

getSquareImage(cbImage)

get the 1/64 subimage of this square for the given chessboard image

Source code in pcwawc/chessvision.py
82
83
84
def getSquareImage(self, cbImage):
    """get the 1/64 subimage of this square for the given chessboard image"""
    pass

IWarp

Bases: Interface

trapez to square warp point handling

Source code in pcwawc/chessvision.py
113
114
115
116
117
118
119
120
121
122
123
124
125
class IWarp(Interface):
    """trapez to square warp point handling"""

    def rotate(self, angle):
        """rotate me with the given angle"""
        pass

    def updatePoints(self):
        """update the points"""

    def addPoint(self, px, py):
        """add the given point to the warp point list"""
        pass

addPoint(px, py)

add the given point to the warp point list

Source code in pcwawc/chessvision.py
123
124
125
def addPoint(self, px, py):
    """add the given point to the warp point list"""
    pass

rotate(angle)

rotate me with the given angle

Source code in pcwawc/chessvision.py
116
117
118
def rotate(self, angle):
    """rotate me with the given angle"""
    pass

updatePoints()

update the points

Source code in pcwawc/chessvision.py
120
121
def updatePoints(self):
    """update the points"""

config

Created on 2019-12-31

@author: wf

Config

configuration for Play Chess With a WebCam

Source code in pcwawc/config.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class Config:
    """configuration for Play Chess With a WebCam"""

    def __init__(self, configFile, config=None):
        self.configFile = configFile
        if config is not None:
            self.config = config
        else:
            if not os.path.isfile(configFile):
                print(
                    "%s is missing please create it if you'd like to e.g. use the lichess bot api"
                    % (configFile)
                )
                self.config = {}
            else:
                self.config = yaml.load(open(self.configFile), Loader=yaml.FullLoader)

    @staticmethod
    def default(configName="config", configPath=os.getenv("HOME") + "/.pcwawc/"):
        configFile = configPath + configName + ".yaml"
        return Config(configFile)

detectorfactory

Created on 2019-12-14

@author: wf

detectstate

Created on 27.11.2019

we could have used: see https://github.com/pytransitions/transitions see https://www.zeolearn.com/magazine/writing-maintainable-code-using-sate-machines-in-python but we don't for the time being

@author: wf

ChangeState

Bases: IntEnum

Source code in pcwawc/detectstate.py
18
19
20
21
22
23
24
25
26
27
class ChangeState(IntEnum):
    """ """

    CALIBRATING = 0
    PRE_MOVE = 1
    IN_MOVE = 2
    POTENTIAL_MOVE = 5

    def title(self, titles=["calibrating", "pre move", "in move", "potential move"]):
        return titles[self]

DetectColorState

Bases: object

detect state from Color Distribution

Source code in pcwawc/detectstate.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
class DetectColorState(object):
    """detect state from Color Distribution"""

    def __init__(self, trapez):
        self.frames = 0
        self.trapez = trapez
        self.preMoveStats = None
        self.imagePath = Environment.debugImagePath + "states/"
        Environment.checkDir(self.imagePath)

    def check(self, cbImage, averageColors, drawDebug=False):
        self.frames += 1
        startco = timer()
        self.averageColors = averageColors
        self.image = cbImage.image
        self.fieldColorStats = self.trapez.optimizeColorCheck(cbImage, averageColors)
        endco = timer()
        if drawDebug:
            self.fieldColorStats.showStatsDebug(endco - startco)
            self.drawDebug()
        valid = self.fieldColorStats.minSelectivity > 0
        if valid:
            self.preMoveStats = self.fieldColorStats
        if not valid and self.preMoveStats is not None:
            for tSquare in self.trapez.genSquares():
                state = self.squareState(
                    self.preMoveStats,
                    tSquare,
                    self.fieldColorStats.colorPercent[tSquare.an],
                )

    def inRange(self, stats, fs, percent):
        """check if the given percent value is in the range for the given fieldStats"""
        minValue = stats[fs].min
        maxValue = stats[fs].max
        return percent >= minValue and percent <= maxValue

    def squareState(self, fieldColorStats, tSquare, percent):
        """determine the state of the given field tSquare with the given percent of pixels that fit the expectation"""
        fieldState = tSquare.fieldState
        selectivity = fieldColorStats.minSelectivity
        state = self.inRange(fieldColorStats.stats, fieldState, percent)
        return state

    def drawDebug(self):
        if self.preMoveStats is not None:
            for tSquare in self.trapez.genSquares():
                state = self.squareState(
                    self.fieldColorStats,
                    tSquare,
                    self.fieldColorStats.colorPercent[tSquare.an],
                )
                percent = "%.0f" % (self.fieldColorStats.colorPercent[tSquare.an])
                color = (0, 255, 0) if state else (0, 0, 255)
                self.trapez.drawRCenteredText(
                    self.image, percent, tSquare.rcx, tSquare.rcy, color
                )
            filepath = "%s/colorState-%04d.jpg" % (self.imagePath, self.frames)
            self.trapez.video.writeImage(self.image, filepath)

inRange(stats, fs, percent)

check if the given percent value is in the range for the given fieldStats

Source code in pcwawc/detectstate.py
113
114
115
116
117
def inRange(self, stats, fs, percent):
    """check if the given percent value is in the range for the given fieldStats"""
    minValue = stats[fs].min
    maxValue = stats[fs].max
    return percent >= minValue and percent <= maxValue

squareState(fieldColorStats, tSquare, percent)

determine the state of the given field tSquare with the given percent of pixels that fit the expectation

Source code in pcwawc/detectstate.py
119
120
121
122
123
124
def squareState(self, fieldColorStats, tSquare, percent):
    """determine the state of the given field tSquare with the given percent of pixels that fit the expectation"""
    fieldState = tSquare.fieldState
    selectivity = fieldColorStats.minSelectivity
    state = self.inRange(fieldColorStats.stats, fieldState, percent)
    return state

DetectState

Bases: object

keeps track of the detections state

Source code in pcwawc/detectstate.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
class DetectState(object):
    """
    keeps track of the detections state
    """

    def __init__(
        self,
        validDiffSumTreshold,
        invalidDiffSumTreshold,
        diffSumDeltaTreshold,
        onPieceMoveDetected=None,
        onMoveDetected=None,
    ):
        """construct me"""
        self.frames = 0
        self.validFrames = 0
        self.invalidFrames = 0
        self.validDiffSumTreshold = validDiffSumTreshold
        self.invalidDiffSumTreshold = invalidDiffSumTreshold
        self.diffSumDeltaTreshold = diffSumDeltaTreshold
        self.onPieceMoveDetected = onPieceMoveDetected
        self.onMoveDetecte = onMoveDetected

    def check(self, validChanges, diffSum, diffSumDelta, meanFrameCount):
        """check the detection state given the current diffSum and diffSumDelta"""
        self.invalidStarted = self.invalidFrames > 3
        self.invalidStable = (self.invalidFrames >= meanFrameCount,)
        self.validStable = self.validFrames >= meanFrameCount
        # trigger statistics push if valid
        if self.invalidStable:
            self.validBoard = (
                diffSum < self.invalidDiffSumTreshold
                and abs(diffSumDelta) < self.diffSumDeltaTreshold
                and validChanges >= 62
            )
        else:
            self.validBoard = diffSum < self.validDiffSumTreshold
        if self.validBoard:
            self.validFrames += 1
        else:
            self.invalidFrames += 1

    def nextFrame(self):
        self.frames += 1

    def invalidEnd(self):
        self.invalidFrames = 0

    def validEnd(self):
        self.validFrames = 0

__init__(validDiffSumTreshold, invalidDiffSumTreshold, diffSumDeltaTreshold, onPieceMoveDetected=None, onMoveDetected=None)

construct me

Source code in pcwawc/detectstate.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def __init__(
    self,
    validDiffSumTreshold,
    invalidDiffSumTreshold,
    diffSumDeltaTreshold,
    onPieceMoveDetected=None,
    onMoveDetected=None,
):
    """construct me"""
    self.frames = 0
    self.validFrames = 0
    self.invalidFrames = 0
    self.validDiffSumTreshold = validDiffSumTreshold
    self.invalidDiffSumTreshold = invalidDiffSumTreshold
    self.diffSumDeltaTreshold = diffSumDeltaTreshold
    self.onPieceMoveDetected = onPieceMoveDetected
    self.onMoveDetecte = onMoveDetected

check(validChanges, diffSum, diffSumDelta, meanFrameCount)

check the detection state given the current diffSum and diffSumDelta

Source code in pcwawc/detectstate.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def check(self, validChanges, diffSum, diffSumDelta, meanFrameCount):
    """check the detection state given the current diffSum and diffSumDelta"""
    self.invalidStarted = self.invalidFrames > 3
    self.invalidStable = (self.invalidFrames >= meanFrameCount,)
    self.validStable = self.validFrames >= meanFrameCount
    # trigger statistics push if valid
    if self.invalidStable:
        self.validBoard = (
            diffSum < self.invalidDiffSumTreshold
            and abs(diffSumDelta) < self.diffSumDeltaTreshold
            and validChanges >= 62
        )
    else:
        self.validBoard = diffSum < self.validDiffSumTreshold
    if self.validBoard:
        self.validFrames += 1
    else:
        self.invalidFrames += 1

environment

Environment

Runtime Environment

Source code in pcwawc/environment.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class Environment:
    """Runtime Environment"""

    debugImagePath = "/tmp/pcwawc/"

    def __init__(self):
        """get the directory in which the testMedia resides"""
        self.scriptPath = Path(__file__).parent
        self.projectPath = self.scriptPath.parent
        self.testMediaPath = Path(self.projectPath, "testMedia")
        self.testMedia = str(self.testMediaPath.absolute()) + "/"
        self.games = str(self.projectPath) + "/games"

    @staticmethod
    def checkDir(path):
        # print (path)
        if not os.path.isdir(path):
            try:
                os.mkdir(path)
            except OSError:
                print("Creation of the directory %s failed" % path)
            else:
                print("Successfully created the directory %s " % path)

    @staticmethod
    def inContinuousIntegration():
        """
        are we in a Continuous Integration Environment?
        """
        publicCI = getpass.getuser() in ["travis", "runner"]
        privateCI = "capri.bitplan.com" == socket.getfqdn()
        return publicCI or privateCI

__init__()

get the directory in which the testMedia resides

Source code in pcwawc/environment.py
14
15
16
17
18
19
20
def __init__(self):
    """get the directory in which the testMedia resides"""
    self.scriptPath = Path(__file__).parent
    self.projectPath = self.scriptPath.parent
    self.testMediaPath = Path(self.projectPath, "testMedia")
    self.testMedia = str(self.testMediaPath.absolute()) + "/"
    self.games = str(self.projectPath) + "/games"

inContinuousIntegration() staticmethod

are we in a Continuous Integration Environment?

Source code in pcwawc/environment.py
33
34
35
36
37
38
39
40
@staticmethod
def inContinuousIntegration():
    """
    are we in a Continuous Integration Environment?
    """
    publicCI = getpass.getuser() in ["travis", "runner"]
    privateCI = "capri.bitplan.com" == socket.getfqdn()
    return publicCI or privateCI

environment4test

Environment4Test

Bases: Environment

Test Environment

Source code in pcwawc/environment4test.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
class Environment4Test(Environment):
    """Test Environment"""

    warpPointList = [
        ([427, 180], [962, 180], [952, 688], [430, 691]),
        ([288, 76], [1057, 93], [1050, 870], [262, 866]),
        ([132, 89], [492, 90], [497, 451], [122, 462]),
        ([143, 18], [511, 13], [511, 387], [147, 386]),
        ([260, 101], [962, 133], [950, 815], [250, 819]),
        ([250, 108], [960, 135], [947, 819], [242, 828]),
        ([277, 106], [972, 129], [957, 808], [270, 800]),
        ([278, 88], [999, 88], [1072, 808], [124, 786]),
        ([360, 238], [2380, 224], [2385, 2251], [407, 2256]),
        ([483, 132], [1338, 124], [1541, 936], [255, 953]),
        ([8, 1], [813, 1], [817, 812], [3, 809]),
        ([0, 0], [522, 0], [523, 523], [0, 523]),
        ([678, 33], [1582, 33], [1571, 923], [686, 896]),
    ]

    rotations = [0, 0, 0, 0, 270, 270, 270, 0, 0, 0, 0, 0, 270]

    fens = [
        Board.EMPTY_FEN,
        Board.EMPTY_FEN,
        Board.START_FEN,
        Board.EMPTY_FEN,
        Board.START_FEN,
        Board.START_FEN,
        Board.START_FEN,
        Board.EMPTY_FEN,
        Board.EMPTY_FEN,
        Board.EMPTY_FEN,
        Board.EMPTY_FEN,
        Board.START_FEN,
        Board.START_FEN,
    ]

    def __init__(self, headless=None):
        """
        constructor
        """
        super().__init__()
        if headless is None:
            self.headless = Environment.inContinuousIntegration()
        else:
            self.headless = headless
        rlen = len(Environment4Test.rotations)
        wlen = len(Environment4Test.warpPointList)
        fenlen = len(Environment4Test.fens)
        if rlen != wlen:
            raise Exception("%d rotations for %d warpPoints" % (rlen, wlen))
        if fenlen != wlen:
            raise Exception("%d FENs for %d images" % (fenlen, wlen))
        self.imageInfos = []
        for num in range(1, 1000):
            path = self.testMedia + "chessBoard%03d.jpg" % (num)
            if not os.path.isfile(path):
                break
            if num - 1 >= len(Environment4Test.rotations):
                raise Exception(
                    "%d test files for %d warpPoints/rotations" % (num, wlen)
                )
            imageInfo = ImageInfo(
                num,
                title="image%03d" % (num),
                path=path,
                fen=Environment4Test.fens[num - 1],
                rotation=Environment4Test.rotations[num - 1],
                warpPoints=Environment4Test.warpPointList[num - 1],
            )
            self.imageInfos.append(imageInfo)

    def getImage(self, num: int):
        """
        get the image with the given number

        Args:
            num(int): the index of the image
        """
        image, video = self.getImageWithVideo(num)
        if video is None:
            pass
        return image

    def getVideo(self) -> Video:
        """
        get a Video (potentially headless)

        Returns:
            Video: the video handler for openCV
        """
        video = Video()
        video.headless = self.headless
        return video

    def getImageWithVideo(self, num: int):
        """
        get the image and video

        Args:
            num(int): the number of the image

        Returns:
            image: the image
            video: the video display
        """
        video = self.getVideo()
        filename = self.testMedia + "chessBoard%03d.jpg" % (num)
        image = video.readImage(filename)
        height, width = image.shape[:2]
        print("read image %s: %dx%d" % (filename, width, height))
        return image, video

    def prepareFromImageInfo(self, imageInfo):
        """
        prepare a test environment from the given image Inforrmation
        """
        warp = Warp(list(imageInfo.warpPoints))
        warp.rotation = imageInfo.rotation
        image, video = self.getImageWithVideo(imageInfo.index)
        return image, video, warp

    def loadFromImageInfo(self, imageInfo):
        analyzer = self.analyzerFromImageInfo(imageInfo)
        start = timer()
        cbImageSet = analyzer.vision.readChessBoardImage()
        assert analyzer.hasImage()
        analyzer.processImageSet(cbImageSet)
        end = timer()
        cbWarped = cbImageSet.cbWarped
        print(
            "%.3fs for loading image %s: %4d x %4d"
            % ((end - start), imageInfo.title, cbWarped.width, cbWarped.height)
        )
        return cbImageSet.cbWarped

    def analyzerFromImageInfo(self, imageInfo):
        args = Args("test")
        args.parse(
            [
                "--input",
                imageInfo.path,
                "--fen",
                imageInfo.fen,
                "--warp",
                imageInfo.warpPointsAsString(),
            ]
        )
        analyzer = VideoAnalyzer(args.args)
        analyzer.setUpDetector()
        analyzer.setDebug(True)
        analyzer.open()
        return analyzer

    def getTestVideos(self, exclude=["baxter.avi"]):
        for file in os.listdir(self.testMedia):
            if file.endswith(".avi"):
                if file not in exclude:
                    yield file

__init__(headless=None)

constructor

Source code in pcwawc/environment4test.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def __init__(self, headless=None):
    """
    constructor
    """
    super().__init__()
    if headless is None:
        self.headless = Environment.inContinuousIntegration()
    else:
        self.headless = headless
    rlen = len(Environment4Test.rotations)
    wlen = len(Environment4Test.warpPointList)
    fenlen = len(Environment4Test.fens)
    if rlen != wlen:
        raise Exception("%d rotations for %d warpPoints" % (rlen, wlen))
    if fenlen != wlen:
        raise Exception("%d FENs for %d images" % (fenlen, wlen))
    self.imageInfos = []
    for num in range(1, 1000):
        path = self.testMedia + "chessBoard%03d.jpg" % (num)
        if not os.path.isfile(path):
            break
        if num - 1 >= len(Environment4Test.rotations):
            raise Exception(
                "%d test files for %d warpPoints/rotations" % (num, wlen)
            )
        imageInfo = ImageInfo(
            num,
            title="image%03d" % (num),
            path=path,
            fen=Environment4Test.fens[num - 1],
            rotation=Environment4Test.rotations[num - 1],
            warpPoints=Environment4Test.warpPointList[num - 1],
        )
        self.imageInfos.append(imageInfo)

getImage(num)

get the image with the given number

Parameters:

Name Type Description Default
num(int)

the index of the image

required
Source code in pcwawc/environment4test.py
104
105
106
107
108
109
110
111
112
113
114
def getImage(self, num: int):
    """
    get the image with the given number

    Args:
        num(int): the index of the image
    """
    image, video = self.getImageWithVideo(num)
    if video is None:
        pass
    return image

getImageWithVideo(num)

get the image and video

Parameters:

Name Type Description Default
num(int)

the number of the image

required

Returns:

Name Type Description
image

the image

video

the video display

Source code in pcwawc/environment4test.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def getImageWithVideo(self, num: int):
    """
    get the image and video

    Args:
        num(int): the number of the image

    Returns:
        image: the image
        video: the video display
    """
    video = self.getVideo()
    filename = self.testMedia + "chessBoard%03d.jpg" % (num)
    image = video.readImage(filename)
    height, width = image.shape[:2]
    print("read image %s: %dx%d" % (filename, width, height))
    return image, video

getVideo()

get a Video (potentially headless)

Returns:

Name Type Description
Video Video

the video handler for openCV

Source code in pcwawc/environment4test.py
116
117
118
119
120
121
122
123
124
125
def getVideo(self) -> Video:
    """
    get a Video (potentially headless)

    Returns:
        Video: the video handler for openCV
    """
    video = Video()
    video.headless = self.headless
    return video

prepareFromImageInfo(imageInfo)

prepare a test environment from the given image Inforrmation

Source code in pcwawc/environment4test.py
145
146
147
148
149
150
151
152
def prepareFromImageInfo(self, imageInfo):
    """
    prepare a test environment from the given image Inforrmation
    """
    warp = Warp(list(imageInfo.warpPoints))
    warp.rotation = imageInfo.rotation
    image, video = self.getImageWithVideo(imageInfo.index)
    return image, video, warp

ImageInfo

information about a chessboard test image

Source code in pcwawc/environment4test.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class ImageInfo:
    """information about a chessboard test image"""

    def __init__(self, index, title, path, fen, rotation, warpPoints):
        """
        constructor
        """
        self.index = index
        self.title = title
        self.path = path
        self.fen = fen
        self.rotation = rotation
        self.warpPoints = warpPoints

    def warpPointsAsString(self):
        return str(self.warpPoints)

__init__(index, title, path, fen, rotation, warpPoints)

constructor

Source code in pcwawc/environment4test.py
17
18
19
20
21
22
23
24
25
26
def __init__(self, index, title, path, fen, rotation, warpPoints):
    """
    constructor
    """
    self.index = index
    self.title = title
    self.path = path
    self.fen = fen
    self.rotation = rotation
    self.warpPoints = warpPoints

eventhandling

Created on 2019-12-13

@author: wf

Event handling module see https://stackoverflow.com/a/1925836/1497139

Event

Bases: object

an Event

Source code in pcwawc/eventhandling.py
10
11
12
13
class Event(object):
    """an Event"""

    pass

Observable

Bases: object

an Observable

Source code in pcwawc/eventhandling.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class Observable(object):
    """an Observable"""

    def __init__(self):
        self.callbacks = []

    def subscribe(self, callback):
        self.callbacks.append(callback)

    def unsubscribe(self, callback):
        if callback in self.callbacks:
            self.callbacks.remove(callback)

    def fire(self, **attrs):
        e = Event()
        e.source = self
        for k, v in attrs.items():
            setattr(e, k, v)
        for fn in self.callbacks:
            fn(e)

field

Field

a single Field of a chess board as observed from a WebCam

Source code in pcwawc/field.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
@implementer(ISquare)
class Field:
    """a single Field of a chess board as observed from a WebCam"""

    rows = 8
    cols = 8
    # bgr colors
    white = (255, 255, 255)
    lightGrey = (64, 64, 64)
    grey = (128, 128, 128)
    darkGrey = (192, 192, 192)
    green = (0, 255, 0)
    red = (0, 0, 255)
    black = (0, 0, 0)

    @staticmethod
    def hsv_to_rgb(h, s, v):
        return colorsys.hsv_to_rgb(h, s, v)

    @staticmethod
    def hsv255_to_rgb255(h, s, v):
        r, g, b = Field.hsv_to_rgb(h / 255, s / 255, v / 255)
        return (int(r * 255), int(g * 255), int(b * 255))

    # construct me
    def __init__(self, board, row, col):
        self.board = board
        # row and column indices from 0-7
        self.row = row
        self.col = col
        self.squareIndex = (7 - row) * 8 + col
        self.square = chess.SQUARES[self.squareIndex]
        # https://python-chess.readthedocs.io/en/latest/core.html - chess.WHITE=True, chess.BLACK=False
        # https://gamedev.stackexchange.com/a/44998/133453
        # A8 at 0,0 is white moving an odd number of steps horizontally and vertically will end up on a black
        self.fieldColor = (self.col + self.row) % 2 == 0
        # algebraic notation of field
        # A1 to H8
        self.an = chess.SQUARE_NAMES[self.squareIndex]
        # center pixel position of field
        self.px = None
        self.py = None
        self.pcx = None
        self.pcy = None
        self.width = None
        self.height = None
        self.maxX = None
        self.maxY = None
        self.distance = None
        self.step = None
        self.hsvStats = None
        self.rgbStats = None
        self.luminance = None
        self.rgbColorKey = None
        self.colorKey = None

    def getRect(self):
        x1 = int(self.pcx - self.width / 2)
        y1 = int(self.pcy - self.height / 2)
        return x1, y1, int(self.width), int(self.height)

    def divideInROIs(self, grid, roiLambda):
        self.rois = []
        for roiIndex in range(grid.rois):
            self.rois.append(FieldROI(self, grid, roiIndex, roiLambda))

    def getPiece(self):
        if self.board is None:
            raise Exception("Board not set for %s" % (self.an))
        if self.board.chessboard is None:
            raise Exception("board.chessboard not set for %s" % (self.an))
        piece = self.board.chessboard.piece_at(self.square)
        return piece

    def getFieldState(self):
        piece = self.getPiece()
        if piece is None:
            if self.fieldColor == chess.WHITE:
                return FieldState.WHITE_EMPTY
            else:
                return FieldState.BLACK_EMPTY
        elif piece.color == chess.WHITE:
            if self.fieldColor == chess.WHITE:
                return FieldState.WHITE_WHITE
            else:
                return FieldState.BLACK_WHITE
        else:
            if self.fieldColor == chess.WHITE:
                return FieldState.WHITE_BLACK
            else:
                return FieldState.BLACK_BLACK
        # this can't happen
        return None

    # analyze the color around my center pixel to the given
    # distance
    def analyzeColor(self, image, hsv, distance=1, step=1):
        self.distance = distance
        self.step = step
        self.hsvStats = ColorStats()
        self.rgbStats = ColorStats()
        for dx in range(-distance * step, distance * step + 1, step):
            for dy in range(-distance * step, distance * step + 1, step):
                ph, ps, pv = hsv[self.pcy + dy, self.pcx + dx]
                b, g, r = image[self.pcy + dy, self.pcx + dx]
                # print ("(%3d,%3d)=(%3d,%3d,%3d)" % (self.pcx+dx,self.pcy+dy,ph,ps,pv))
                self.hsvStats.push(ph, ps, pv)
                self.rgbStats.push(r, g, b)
        self.luminance = self.hsvStats.c3Stats
        self.rgbColorKey = self.rgbStats.rgbColorKey()
        self.colorKey = self.hsvStats.colorKey()

    def getColor(self):
        h, s, v = self.hsvStats.mean()
        r, g, b = Field.hsv255_to_rgb255(h, s, v)
        bgr = (b, g, r)
        # print("(%3d,%3d)=(%3d,%3d,%3d) (%3d,%3d,%3d)" % (self.pcx,self.pcy,h,s,v,r,g,b))
        return bgr

    def interPolate(self, rx, ry):
        """interpolate the given relative coordinate"""
        # interpolate the pixel
        x = int(self.pcx + self.width * (rx - 0.5) + 0.5)
        y = int(self.pcy + self.height * (ry - 0.5) + 0.5)
        return self.limit(x, y)

    def limit(self, x, y):
        if self.maxX is not None:
            if x >= self.maxX:
                x = self.maxX - 1
        if self.maxY is not None:
            if y >= self.maxY:
                y = self.maxY - 1
        if x < 0:
            x = 0
        if y < 0:
            y = 0
        pixel = (x, y)
        return pixel

    def setRect(self, width, height, fieldWidth, fieldHeight):
        pcx = int(fieldWidth * (2 * self.col + 1) // 2)
        pcy = int(fieldHeight * (2 * self.row + 1) // 2)
        self.width = fieldWidth
        self.height = fieldHeight
        self.pcx = pcx
        self.pcy = pcy
        self.maxX = width
        self.maxY = height

    def getSquareImage(self, cbImage):
        x, y, dh, dw = self.getRect()
        squareImage = cbImage.image[y : y + dh, x : x + dw]
        return squareImage

    def drawDebug(self, video, image, detectedFieldState):
        pcx = self.pcx
        pcy = self.pcy
        distance = self.distance
        step = self.step
        fieldState = self.getFieldState()
        detectColor = (
            Field.black
        )  # Field.green if fieldState == detectedFieldState else Field.red
        fieldColor = self.getColor()
        x1, y1, x2, y2 = (
            pcx - distance * step,
            pcy - distance * step,
            pcx + distance * step,
            pcy + distance * step,
        )
        # outer thickness for displaying detect state: green ok red - there is an issue
        ot = 2
        # inner thickness for displaying the field color
        it = 3
        video.drawRectangle(
            image,
            (x1 - ot, y1 - ot),
            (x2 + ot, y2 + ot),
            thickness=ot,
            color=detectColor,
        )
        video.drawRectangle(image, (x1, y1), (x2, y2), thickness=it, color=fieldColor)
        piece = self.getPiece()
        if piece is None:
            emptyFieldColor = (
                Field.white if fieldState == FieldState.WHITE_EMPTY else Field.black
            )
            video.drawRectangle(
                image,
                (x1 + it, y1 + it),
                (x2 - it, y2 - it),
                thickness=-1,
                color=emptyFieldColor,
            )
        else:
            symbol = piece.symbol()  # piece.unicode_symbol()
            video.drawCenteredText(image, symbol, pcx, pcy)

interPolate(rx, ry)

interpolate the given relative coordinate

Source code in pcwawc/field.py
225
226
227
228
229
230
def interPolate(self, rx, ry):
    """interpolate the given relative coordinate"""
    # interpolate the pixel
    x = int(self.pcx + self.width * (rx - 0.5) + 0.5)
    y = int(self.pcy + self.height * (ry - 0.5) + 0.5)
    return self.limit(x, y)

FieldROI

a region of interest within the square image area of pixels represented by some pixels

Source code in pcwawc/field.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
class FieldROI:
    """a region of interest within the square image area of pixels represented by some pixels"""

    # construct me from a field, a generator for relative pixels and the number of x and y steps to generate from
    def __init__(self, field, grid, roiIndex, relPixelLambda):
        self.relPixelLambda = relPixelLambda
        self.grid = grid
        self.roiIndex = roiIndex
        self.pixels = grid.xsteps * grid.ysteps
        self.field = field
        self.colorStats = ColorStats()

    # analyze the given region of interest for the given image
    def analyze(self, image):
        for pixel in self.pixelList():
            x, y = pixel
            c1, c2, c3 = image[y, x]
            self.colorStats.push(c1, c2, c3)

    def pixelList(self):
        """generate a pixel list by using the generated relative position from"""
        for xstep in range(self.grid.xsteps):
            for ystep in range(self.grid.ysteps):
                rx, ry = self.relPixelLambda(self.grid, self.roiIndex, xstep, ystep)
                rx, ry = self.grid.shiftSafety(rx, ry)
                pixel = self.field.interPolate(rx, ry)
                yield pixel

pixelList()

generate a pixel list by using the generated relative position from

Source code in pcwawc/field.py
 96
 97
 98
 99
100
101
102
103
def pixelList(self):
    """generate a pixel list by using the generated relative position from"""
    for xstep in range(self.grid.xsteps):
        for ystep in range(self.grid.ysteps):
            rx, ry = self.relPixelLambda(self.grid, self.roiIndex, xstep, ystep)
            rx, ry = self.grid.shiftSafety(rx, ry)
            pixel = self.field.interPolate(rx, ry)
            yield pixel

Grid

Grid Info in the region of interest

Source code in pcwawc/field.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
class Grid:
    """Grid Info in the region of interest"""

    def __init__(self, rois, xsteps, ysteps, safetyX=0, safetyY=0):
        self.rois = rois
        self.xsteps = xsteps
        self.ysteps = ysteps
        # safety Margin in percent
        self.safetyX = safetyX / 100
        self.safetyY = safetyY / 100

    @staticmethod
    def split(pStep, parts):
        return (pStep + 1) / (parts + 1)

    def xstep(self, pXStep):
        return Grid.split(pXStep, self.xsteps)

    def ystep(self, pYStep):
        return Grid.split(pYStep, self.ysteps)

    def d(self):
        return 1 / (self.rois)

    def dofs(self, roiIndex):
        return self.d() * (roiIndex)

    def safeShift(self, value, safetyMargin):
        if safetyMargin == 0:
            return value
        else:
            return value * (1 - 2 * safetyMargin) + safetyMargin

    def shiftSafety(self, rx, ry):
        return self.safeShift(rx, self.safetyX), self.safeShift(ry, self.safetyY)

SquareKind

Bases: IntEnum

kind of Square

Source code in pcwawc/field.py
17
18
19
20
21
22
23
24
25
26
27
28
class SquareKind(IntEnum):
    """kind of Square"""

    FIELD_WHITE = 0
    FIELD_BLACK = 1
    PIECE_WHITE = 2
    PIECE_BLACK = 3

    def title(
        self, titles=["white field", "black field", "white piece", "black piece"]
    ):
        return titles[self]

fpscheck

FPSCheck

Bases: object

Frame per second tracker

Source code in pcwawc/fpscheck.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class FPSCheck(object):
    """Frame per second tracker"""

    def __init__(self):
        # store the start time, end time, and total number of frames
        # that were examined between the start and end intervals
        self._start = None
        self._end = None
        self._numFrames = 0

    def start(self):
        # start the timer
        self._start = datetime.datetime.now()
        self._end = datetime.datetime.now()
        return self

    def update(self):
        # increment the total number of frames examined during the
        # start and end intervals
        self._numFrames += 1
        # update the timer
        self._end = datetime.datetime.now()

    def elapsed(self):
        # return the total number of seconds between the start and
        # end interval
        return (self._end - self._start).total_seconds()

    def fps(self):
        # compute the (approximate) frames per second
        return self._numFrames / self.elapsed()

game

Game

Bases: JsonAbleMixin

keeps track of a games state

Source code in pcwawc/game.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
@implementer(IGame)
class Game(JsonAbleMixin):
    """keeps track of a games state"""

    def __init__(self, gameid):
        self.gameid = gameid
        self.fen = chess.STARTING_BOARD_FEN
        # http://www.saremba.de/chessgml/standards/pgn/pgn-complete.htm
        self.pgn = None
        self.headers = {}
        self.headers["Date"] = strftime("%Y-%m-%d %H:%M:%S")
        self.locked = False
        self.moveIndex = 0

    # def __getstate__(self):
    #    state={}
    #    state["gameid"]=self.gameid
    #    state["pgn"]=self.pgn
    #    state["locked"]=self.locked
    #    state["moveIndex"]=self.moveIndex
    #    return state

    # def __setstate__(self, state):
    #    self.gameid=state["gameid"]
    #   self.pgn=state["pgn"]
    #    self.locked=state["locked"]
    #    self.moveIndex=state["moveIndex"]
    def updateHeaders(self, headers):
        for key, header in self.headers.items():
            headers[key] = header

    def update(self, board):
        try:
            game = chess.pgn.Game.from_board(board.chessboard)
            self.updateHeaders(game.headers)
            self.pgn = str(game)
        except BaseException as e:
            print("pgn error: %s", str(e))

    def move(self, board):
        self.moveIndex += 1
        self.update(board)

    @staticmethod
    def gameId():
        gameid = strftime("game_%Y-%m-%d_%H%M%S")
        return gameid

    def showDebug(self):
        print("fen: %s" % (self.fen))
        print("pgn: %s" % (self.pgn))
        print("moveIndex: %d" % (self.moveIndex))

WebCamGame

Bases: Game

keeps track of a webcam games state

Source code in pcwawc/game.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
class WebCamGame(Game):
    """keeps track of a webcam games state"""

    def __init__(self, gameid):
        super(WebCamGame, self).__init__(gameid)
        self.warp = None

    def checkEnvironment(self, env):
        Environment.checkDir(env.games)

    def save(self, path="games"):
        env = Environment()
        savepath = str(env.projectPath) + "/" + path
        Environment.checkDir(savepath)
        savedir = savepath + "/" + self.gameid
        Environment.checkDir(savedir)
        jsonFile = savedir + "/" + self.gameid + "-webcamgame"
        self.writeJson(jsonFile)
        gameJsonFile = savedir + "/" + self.gameid
        self.writeJson(gameJsonFile)

        if self.locked is not None and not self.locked:
            if self.fen is not None:
                fenFile = savedir + "/" + self.gameid + ".fen"
                print(self.fen, file=open(fenFile, "w"))
            if self.pgn is not None:
                pgnFile = savedir + "/" + self.gameid + ".pgn"
                # see https://python-chess.readthedocs.io/en/latest/pgn.html
                print(self.pgn, file=open(pgnFile, "w"), end="\n\n")
        return savedir

    @staticmethod
    def createNewGame():
        return WebCamGame(Game.gameId())

    @staticmethod
    def fromArgs(args):
        env = Environment()
        if args is None or args.game is None:
            webCamGame = WebCamGame.createNewGame()
        else:
            gamepath = args.game
            if not gamepath.startswith("/"):
                gamepath = env.games + "/" + gamepath
            webCamGame = WebCamGame.readJson(gamepath)
            if webCamGame is None:
                # self.videoAnalyzer.log("could not read %s " % (gamepath))
                webCamGame = webCamGame.createNewGame()
        webCamGame.checkEnvironment(env)
        if args is not None:
            if args.event is not None:
                webCamGame.headers["Event"] = args.event
            if args.site is not None:
                webCamGame.headers["Site"] = args.site
            if args.round is not None:
                webCamGame.headers["Round"] = args.round
            if args.white is not None:
                webCamGame.headers["White"] = args.white
            if args.black is not None:
                webCamGame.headers["Black"] = args.black

        return webCamGame

    @staticmethod
    def getWebCamGames(path):
        webCamGames = {}
        for file in os.listdir(path):
            if file.endswith(".json"):
                filePath = os.path.join(path, file)
                webCamGame = WebCamGame.readJson(filePath, "")
                if isinstance(webCamGame, WebCamGame):
                    webCamGames[webCamGame.gameid] = webCamGame
                else:
                    raise Exception("invalid json file %s" % filePath)
        return webCamGames

histogram

Created on 2019-11-29

@author: wf

Histogram

Image Histogram

Source code in pcwawc/histogram.py
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
class Histogram:
    """Image Histogram"""

    colors = ("blue", "green", "red")

    def __init__(self, image, histSize=256, histRange=(0, 256)):
        """construct me from the given image hist Size and histRange"""
        self.hist = {}
        self.stats = {}
        self.image = image

        start = timer()
        # the upper boundary is exclusive
        for channel in range(len(Histogram.colors)):
            self.hist[channel] = cv2.calcHist(
                [image], [channel], None, [histSize], histRange, accumulate=False
            )
            histindexed = list(enumerate(np.reshape(self.hist[channel], histSize)))
            self.stats[channel] = Stats(histindexed)
        bstats, gstats, rstats = self.stats[0], self.stats[1], self.stats[2]
        self.color = (bstats.mean, gstats.mean, rstats.mean)
        self.mincolor = (bstats.min, gstats.min, rstats.min)
        self.maxcolor = (bstats.max, gstats.max, rstats.max)
        self.maxdelta = (bstats.maxdelta, gstats.maxdelta, rstats.maxdelta)
        # here we are using the color information! This should make the difference!
        self.factor = (bstats.factor, gstats.factor, rstats.factor)
        self.stdv = (bstats.stdv, gstats.stdv, rstats.stdv)
        end = timer()
        self.time = end - start

    def fix(self, value):
        return 0 if value < 0 else 255 if value > 255 else value

    def range(self, relFactor=1.0):
        bstats, gstats, rstats = self.stats[0], self.stats[1], self.stats[2]
        bl, bu = bstats.range(relFactor)
        gl, gu = gstats.range(relFactor)
        rl, ru = rstats.range(relFactor)
        # return np.array([bl,gl,rl],dtype = 'uint8'),np.array([bu,gu,ru],dtype='uint8')
        return (bl, gl, rl), (bu, gu, ru)

    def colorRangeWithFactor(self, rangeFactor):
        b, g, r = self.color
        bs, gs, rs = self.stdv
        rf = rangeFactor
        lower = np.array(
            [self.fix(b - bs * rf), self.fix(g - gs * rf), self.fix(r - rs * rf)],
            dtype="uint8",
        )
        upper = np.array(
            [self.fix(b + bs * rf), self.fix(g + gs * rf), self.fix(r + rs * rf)],
            dtype="uint8",
        )
        return lower, upper

    def colorMask(self, image, rangeFactor):
        """create a color mask for this histogram and apply it to the given image"""
        # lower,upper=self.colorRange(rangeFactor)
        lower, upper = self.mincolor, self.maxcolor
        colorMask = cv2.inRange(image, lower, upper)
        return colorMask

    def showDebug(self):
        print("calculation took %.4f s" % (self.time))
        for channel in range(len(Histogram.colors)):
            print(vars(self.stats[channel]))

    def plotRow(self, ax1, ax2):
        self.rgb = cv2.cvtColor(self.image, cv2.COLOR_BGR2RGB)
        ax1.imshow(self.rgb), ax1.axis("off")
        for i, col in enumerate(Histogram.colors):
            ax2.plot(self.hist[i], color=col)
            # ax2.xlim([0,256])

    def preparePlot(self, rows, cols, title="color histogram", fontsize=20):
        fig, axes = plt.subplots(rows, cols)
        fig.suptitle(title, fontsize=fontsize)
        return fig, axes

    def plot(self):
        fig, (ax1, ax2) = self.preparePlot(1, 2)
        self.plotRow(ax1, ax2)
        return fig

    def save(self, filepath):
        fig = self.plot()
        self.savefig(fig, filepath)

    def savefig(self, fig, filepath):
        fig.savefig(filepath)
        plt.close(fig)

    def show(self):
        fig = self.plot()
        plt.show()
        plt.close(fig)

__init__(image, histSize=256, histRange=(0, 256))

construct me from the given image hist Size and histRange

Source code in pcwawc/histogram.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def __init__(self, image, histSize=256, histRange=(0, 256)):
    """construct me from the given image hist Size and histRange"""
    self.hist = {}
    self.stats = {}
    self.image = image

    start = timer()
    # the upper boundary is exclusive
    for channel in range(len(Histogram.colors)):
        self.hist[channel] = cv2.calcHist(
            [image], [channel], None, [histSize], histRange, accumulate=False
        )
        histindexed = list(enumerate(np.reshape(self.hist[channel], histSize)))
        self.stats[channel] = Stats(histindexed)
    bstats, gstats, rstats = self.stats[0], self.stats[1], self.stats[2]
    self.color = (bstats.mean, gstats.mean, rstats.mean)
    self.mincolor = (bstats.min, gstats.min, rstats.min)
    self.maxcolor = (bstats.max, gstats.max, rstats.max)
    self.maxdelta = (bstats.maxdelta, gstats.maxdelta, rstats.maxdelta)
    # here we are using the color information! This should make the difference!
    self.factor = (bstats.factor, gstats.factor, rstats.factor)
    self.stdv = (bstats.stdv, gstats.stdv, rstats.stdv)
    end = timer()
    self.time = end - start

colorMask(image, rangeFactor)

create a color mask for this histogram and apply it to the given image

Source code in pcwawc/histogram.py
110
111
112
113
114
115
def colorMask(self, image, rangeFactor):
    """create a color mask for this histogram and apply it to the given image"""
    # lower,upper=self.colorRange(rangeFactor)
    lower, upper = self.mincolor, self.maxcolor
    colorMask = cv2.inRange(image, lower, upper)
    return colorMask

Stats

Calculate Histogram statistics see https://math.stackexchange.com/questions/857566/how-to-get-the-standard-deviation-of-a-given-histogram-image

Source code in pcwawc/histogram.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
class Stats:
    """Calculate Histogram statistics see https://math.stackexchange.com/questions/857566/how-to-get-the-standard-deviation-of-a-given-histogram-image"""

    def __init__(self, histindexed):
        self.n = len(histindexed)
        self.max = -sys.maxsize
        self.min = sys.maxsize
        self.sum = 0
        self.prod = 0
        self.sqsum = 0
        for x, y in histindexed:
            self.sum += y
            self.prod += x * y
        self.mean = 0 if self.sum == 0 else self.prod / self.sum
        for x, y in histindexed:
            if y > 0:
                self.min = min(self.min, x)
                self.max = max(self.max, x)
            dx = x - self.mean
            self.sqsum += y * dx * dx
        # σ²
        self.variance = 0 if self.sqsum == 0 else self.sqsum / self.sum
        self.stdv = math.sqrt(self.variance)
        self.maxdelta = max(self.mean - self.min, self.max - self.mean)
        self.factor = 0 if self.stdv == 0 else self.maxdelta / self.stdv
        pass

    def range(self, relFactor=1.0, minValue=0, maxValue=255):
        """return a range relative to my min max range to widen e.g. by 10% use factor 1.1"""
        lower = self.mean - self.stdv * self.factor * relFactor
        if lower < minValue:
            lower = minValue
        upper = self.mean + self.stdv * self.factor * relFactor
        if upper > maxValue:
            upper = maxValue
        return lower, upper

range(relFactor=1.0, minValue=0, maxValue=255)

return a range relative to my min max range to widen e.g. by 10% use factor 1.1

Source code in pcwawc/histogram.py
44
45
46
47
48
49
50
51
52
def range(self, relFactor=1.0, minValue=0, maxValue=255):
    """return a range relative to my min max range to widen e.g. by 10% use factor 1.1"""
    lower = self.mean - self.stdv * self.factor * relFactor
    if lower < minValue:
        lower = minValue
    upper = self.mean + self.stdv * self.factor * relFactor
    if upper > maxValue:
        upper = maxValue
    return lower, upper

jsonablemixin

JsonAbleMixin

Bases: object

allow reading and writing derived objects from a json file

Source code in pcwawc/jsonablemixin.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class JsonAbleMixin(object):
    """allow reading and writing derived objects from a json file"""

    debug = False

    # read me from a yaml file
    @staticmethod
    def readJson(name, postfix=".json"):
        jsonFileName = name
        if not name.endswith(postfix):
            jsonFileName = name + postfix
        # is there a jsonFile for the given name
        if os.path.isfile(jsonFileName):
            if JsonAbleMixin.debug:
                print("reading %s" % (jsonFileName))
            json = open(jsonFileName).read()
            result = jsonpickle.decode(json)
            if JsonAbleMixin.debug:
                print(json)
                print(result)
            return result
        else:
            return None

    def asJson(self):
        json = jsonpickle.encode(self)
        return json

    # write me to the json file with the given name (without postfix)
    def writeJson(self, name, postfix=".json"):
        jsonFileName = name + postfix
        json = self.asJson()
        if JsonAbleMixin.debug:
            print("writing %s" % (jsonFileName))
            print(json)
            print(self)
        jsonFile = open(jsonFileName, "w")
        jsonFile.write(json)
        jsonFile.close()

lichessbridge

Created on 2019-12-21

@author: wf

Account

" Lichess account wrapper

Source code in pcwawc/lichessbridge.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class Account:
    """ " Lichess account wrapper"""

    def __init__(self, adict):
        self.adict = adict
        self.id = adict["id"]
        if "username" in adict:
            self.username = adict["username"]
        elif "name" in adict:
            self.username = adict["name"]
        pass

    def __str__(self):
        text = "%s - (%s)" % (self.username, self.id)
        return text

Game

Bases: Thread, Observable

Lichess game

Source code in pcwawc/lichessbridge.py
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
class Game(threading.Thread, Observable):
    """Lichess game"""

    def __init__(self, lichess, game_id, debug=False, **kwargs):
        super().__init__(**kwargs)
        Observable.__init__(self)
        self.debug = debug
        self.game_id = game_id
        self.lichess = lichess
        self.account = lichess.getAccount()
        self.client = lichess.client
        self.stream = self.client.bots.stream_game_state(game_id)
        self.current_state = None
        self.isOn = False

    def postChat(self, msg):
        if self.debug:
            print("Chat %s: %s" % (self.game_id, msg))
        self.client.bots.post_message(self.game_id, msg)
        pass

    def abort(self):
        try:
            self.client.bots.abort_game(self.game_id)
            return True
        except requests.exceptions.HTTPError as httpError:
            return self.handle(httpError)

    def resign(self):
        try:
            self.client.bots.resign_game(self.game_id)
            return True
        except requests.exceptions.HTTPError as httpError:
            return self.handle(httpError)

    def move(self, move):
        try:
            self.client.bots.make_move(self.game_id, move)
            return True
        except requests.exceptions.HTTPError as httpError:
            return self.handle(httpError)

    def handle(self, httpError):
        if self.debug:
            print(httpError)
        return False

    def stop(self):
        self.isOn = False

    def run(self):
        if self.debug:
            print(
                "started thread for user %s game %s"
                % (self.account.username, self.game_id)
            )
        self.isOn = True
        # https://lichess.org/api#operation/botGameStream
        for event in self.stream:
            # stop if we are flagged to
            if not self.isOn:
                break
            self.current_state = event
            state = None
            eventtype = event["type"]
            if self.debug:
                print(eventtype, event)
            if eventtype == "gameFull":
                self.white = Account(event["white"])
                self.black = Account(event["black"])
                msg = "white:%s black:%s" % (self.white.username, self.black.username)
                self.postChat(msg)
                state = State(event["state"])
            elif eventtype == "gameState":
                state = State(event)
            if state is not None:
                self.fire(state=state)
        if self.debug:
            print(
                "%s stopped thread for game %s" % (self.account.username, self.game_id)
            )

Lichess

Lichess adapter

Source code in pcwawc/lichessbridge.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
class Lichess:
    """Lichess adapter"""

    def __init__(self, tokenName="token", debug=False):
        self.debug = debug
        token = self.getToken(tokenName)
        if token is not None:
            self.session = berserk.TokenSession(token)
            self.client = berserk.Client(self.session)
        else:
            self.client = None
        self.account = None

    def getAccount(self):
        if self.account is None and self.client is not None:
            self.account = Account(self.client.account.get())
        return self.account

    def getToken(self, tokenname="token"):
        config = Config.default()
        if not tokenname in config.config:
            print("no token found in %s please add it" % (config.configFile))
            return None
        return config.config[tokenname]

    def pgnImport(self, pgn):
        payload = {"pgn": pgn, "analyse": "on"}
        res = requests.post("https://lichess.org/import", data=payload)
        print(res.url)
        pass

    def game(self, game_id):
        game = lichess.api.game(game_id)
        return game

    def challenge(self, oponentUserName):
        if self.debug:
            print("challenge %s by %s" % (self.getAccount().username, oponentUserName))
        client = self.client
        client.challenges.create(username=oponentUserName, rated=False)

    def waitForChallenge(self, timeout=1000, pollInterval=0.5):
        """wait for a challend and return the corresponding game"""
        # Stream whats happening and continue when we are challenged
        in_game = False
        client = self.client
        account = self.getAccount()
        if self.debug:
            print(
                "%s waiting for challenge (max %d secs, polling every %.1f secs)"
                % (account.username, timeout, pollInterval)
            )
        while not in_game:
            time.sleep(pollInterval)
            timeout -= pollInterval
            if timeout <= 0:
                raise Exception("time out waiting for challenge")
            for event in client.bots.stream_incoming_events():
                eventtype = event["type"]
                if self.debug:
                    print(eventtype, event)
                if eventtype == "gameStart":
                    game_id = event["game"]["id"]
                    in_game = True
                    break
                elif eventtype == "challenge":
                    challenge = event["challenge"]
                    game_id = challenge["id"]
                    challenger = challenge["challenger"]
                    # don't try to play against myself
                    if not challenger == account.username:
                        client.bots.accept_challenge(game_id)
                        in_game = True
                    elif self.debug:
                        print(
                            "%s avoiding to play against myself in game %s"
                            % (account.username, game_id)
                        )
        if self.debug:
            print("The game %s has started!" % (game_id))
        return game_id

waitForChallenge(timeout=1000, pollInterval=0.5)

wait for a challend and return the corresponding game

Source code in pcwawc/lichessbridge.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
def waitForChallenge(self, timeout=1000, pollInterval=0.5):
    """wait for a challend and return the corresponding game"""
    # Stream whats happening and continue when we are challenged
    in_game = False
    client = self.client
    account = self.getAccount()
    if self.debug:
        print(
            "%s waiting for challenge (max %d secs, polling every %.1f secs)"
            % (account.username, timeout, pollInterval)
        )
    while not in_game:
        time.sleep(pollInterval)
        timeout -= pollInterval
        if timeout <= 0:
            raise Exception("time out waiting for challenge")
        for event in client.bots.stream_incoming_events():
            eventtype = event["type"]
            if self.debug:
                print(eventtype, event)
            if eventtype == "gameStart":
                game_id = event["game"]["id"]
                in_game = True
                break
            elif eventtype == "challenge":
                challenge = event["challenge"]
                game_id = challenge["id"]
                challenger = challenge["challenger"]
                # don't try to play against myself
                if not challenger == account.username:
                    client.bots.accept_challenge(game_id)
                    in_game = True
                elif self.debug:
                    print(
                        "%s avoiding to play against myself in game %s"
                        % (account.username, game_id)
                    )
    if self.debug:
        print("The game %s has started!" % (game_id))
    return game_id

State

Lichess state wrapper

Source code in pcwawc/lichessbridge.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class State:
    """Lichess state wrapper"""

    def __init__(self, adict):
        self.adict = adict
        self.type = adict["type"]
        if self.type == "gameState":
            self.moves = adict["moves"]
            self.wtime = adict["wtime"]
            self.btime = adict["btime"]
            self.winc = adict["winc"]
            self.binc = adict["binc"]
            self.wdraw = adict["wdraw"]
            self.bdraw = adict["bdraw"]
            self.moveList = self.moves.split()
            self.moveIndex = len(self.moveList) - 1

plotlib

PlotLib

Bases: object

create matplotlib based multipage diagrams e.g. color channel histograms of images

Source code in pcwawc/plotlib.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
class PlotLib(object):
    """
    create matplotlib based multipage diagrams e.g. color channel histograms of images
    """

    def __init__(self, title, pagesize, imagesPerPage=4):
        """
        Constructor
        """
        self.title = title
        self.pagesize = pagesize
        self.pagewidth, self.pageheight = self.pagesize
        self.imagesPerPage = imagesPerPage
        self.images = []

    @staticmethod
    def A4(turned=False):
        # A4 canvas
        fig_width_cm = 21  # A4 page size in cm
        fig_height_cm = 29.7
        inches_per_cm = 1 / 2.54  # Convert cm to inches
        fig_width = fig_width_cm * inches_per_cm  # width in inches
        fig_height = fig_height_cm * inches_per_cm  # height in inches
        fig_size = [fig_width, fig_height]
        if turned:
            fig_size = fig_size[::-1]
        return fig_size

    def addPlot(self, image, imageTitle, xvalues=[], yvalues=[], isBGR=False):
        rgb = image
        if isBGR:
            rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        self.images.append((rgb, imageTitle, xvalues, yvalues))

    def plotImage(self, ax, image, imageTitle, thumbNailSize):
        thumbNail = cv2.resize(image, (thumbNailSize, thumbNailSize))
        ax.imshow(thumbNail)
        ax.title.set_text(imageTitle)
        ax.axis("off")
        pass

    def plotChannel(self, img, ax, channel, prevAx=None):
        cImg = img[:, :, channel]
        ax.hist(np.ndarray.flatten(cImg), bins=256)
        if prevAx is not None:
            # Use matplotlib's sharex parameter instead of manually joining axes
            ax.sharex(prevAx)
            prevAx.set_xticklabels([])

        ax.set_yticklabels([])
        return ax

    def plotHistogramm(self, image, axarr, rowIndex, colIndex):
        hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
        rgb = image
        prevAx1 = self.plotChannel(hsv, axarr[rowIndex + 0, colIndex + 0], 0)  # hue
        prevAx2 = self.plotChannel(
            hsv, axarr[rowIndex + 0, colIndex + 1], 1
        )  # saturation
        prevAx3 = self.plotChannel(hsv, axarr[rowIndex + 0, colIndex + 2], 2)  # value
        self.plotChannel(rgb, axarr[rowIndex + 1, colIndex + 0], 0, prevAx1)  # red
        self.plotChannel(rgb, axarr[rowIndex + 1, colIndex + 1], 1, prevAx2)  # green
        self.plotChannel(rgb, axarr[rowIndex + 1, colIndex + 2], 2, prevAx3)  # blue

    def fixPath(self, path):
        if not path.endswith(".pdf"):
            path = path + ".pdf"
        return path

    def addInfos(self, pdf, infos):
        pdfinfo = pdf.infodict()
        for key, info in infos.items():
            pdfinfo[key] = info

    def pixel(self, fig, inch):
        return int(inch * fig.dpi)

    def startPDF(self, path):
        path = self.fixPath(path)
        return PdfPages(path)

    def finishPDFPage(self, pdf):
        plt.tight_layout()
        pdf.savefig()
        plt.close()

    def finishPDF(self, pdf, infos={}):
        self.addInfos(pdf, infos)

    def createHistogramPDF(self, path, plotType=PlotType.HISTOGRAMM, infos={}):
        imageIndex = 0
        with self.startPDF(path) as pdf:
            self.pages = len(self.images) // self.imagesPerPage
            for page in range(self.pages + 1):
                colTitles = ["image", "", "hue/blue", "saturation/green", "value/red"]
                if plotType == PlotType.PLOT:
                    colTitles = ["image", "", "diffSum", ""]
                cols = len(colTitles)
                rows = self.imagesPerPage * 2
                fig, axarr = plt.subplots(rows, cols, figsize=self.pagesize)
                if fig is None:
                    pass
                for ax, colTitle in zip(axarr[0], colTitles):
                    ax.set_title(colTitle)
                thumbNailSize = 512  # self.pixel(fig,self.pageheight)
                for pageImageIndex in range(0, self.imagesPerPage):
                    if imageIndex < len(self.images):
                        image, imageTitle, xvalues, yvalues = self.images[imageIndex]
                        # see https://matplotlib.org/3.1.1/tutorials/intermediate/tight_layout_guide.html#sphx-glr-tutorials-intermediate-tight-layout-guide-py
                        axImage = plt.subplot2grid(
                            (rows, cols), (pageImageIndex * 2, 0), colspan=2, rowspan=2
                        )
                        self.plotImage(axImage, image, imageTitle, thumbNailSize)
                        if plotType == PlotType.HISTOGRAMM:
                            self.plotHistogramm(image, axarr, pageImageIndex * 2, 2)
                        else:
                            axPlot = plt.subplot2grid(
                                (rows, cols),
                                (pageImageIndex * 2, 2),
                                colspan=2,
                                rowspan=2,
                            )
                            axPlot.plot(xvalues, yvalues)

                    else:
                        for col in range(cols):
                            axarr[pageImageIndex * 2, col].remove()
                            axarr[pageImageIndex * 2 + 1, col].remove()
                    imageIndex = imageIndex + 1
                # plt.title("page %d" %(page))
                self.finishPDFPage(pdf)
            self.finishPDF(pdf, infos)

__init__(title, pagesize, imagesPerPage=4)

Constructor

Source code in pcwawc/plotlib.py
26
27
28
29
30
31
32
33
34
def __init__(self, title, pagesize, imagesPerPage=4):
    """
    Constructor
    """
    self.title = title
    self.pagesize = pagesize
    self.pagewidth, self.pageheight = self.pagesize
    self.imagesPerPage = imagesPerPage
    self.images = []

PlotType

Bases: IntEnum

kind of Plot

Source code in pcwawc/plotlib.py
14
15
16
17
18
class PlotType(IntEnum):
    """kind of Plot"""

    HISTOGRAMM = 0
    PLOT = 1

runningstats

ColorStats

calculate the RunningStats for 3 color channels like RGB or HSV simultaneously

Source code in pcwawc/runningstats.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
@implementer(IStats)
class ColorStats:
    """calculate the RunningStats for 3 color channels like RGB or HSV simultaneously"""

    def __init__(self):
        self.c1Stats = RunningStats()
        self.c2Stats = RunningStats()
        self.c3Stats = RunningStats()

    def clear(self):
        self.c1Stats.clear()
        self.c2Stats.clear()
        self.c3Stats.clear()

    def push(self, c1, c2, c3):
        self.c1Stats.push(c1)
        self.c2Stats.push(c2)
        self.c3Stats.push(c3)

    def mean(self):
        return (self.c1Stats.mean(), self.c2Stats.mean(), self.c3Stats.mean())

    def variance(self):
        return (
            self.c1Stats.variance(),
            self.c2Stats.variance(),
            self.c3Stats.variance(),
        )

    def standard_deviation(self):
        return (
            self.c1Stats.standard_deviation(),
            self.c2Stats.standard_deviation(),
            self.c3Stats.standard_deviation(),
        )

    @staticmethod
    def square(value):
        return value * value

    def colorKey(self):
        other = ColorStats()
        other.push(0, 0, 0)
        return ColorStats.distance(self, other)

    def rgbColorKey(self):
        from pcwawc import ciede2000

        value = ciede2000.ciede2000FromRGB(self.mean(), (0, 0, 0))
        return value

    @staticmethod
    def distance(this, other):
        """simple eucledian color distance see e.g. https://en.wikipedia.org/wiki/Color_difference"""
        c1s = ColorStats.square(this.c1Stats.mean() - other.c1Stats.mean())
        c2s = ColorStats.square(this.c2Stats.mean() - other.c2Stats.mean())
        c3s = ColorStats.square(this.c3Stats.mean() - other.c3Stats.mean())
        dist = c1s + c2s + c3s
        return dist

distance(this, other) staticmethod

simple eucledian color distance see e.g. https://en.wikipedia.org/wiki/Color_difference

Source code in pcwawc/runningstats.py
206
207
208
209
210
211
212
213
@staticmethod
def distance(this, other):
    """simple eucledian color distance see e.g. https://en.wikipedia.org/wiki/Color_difference"""
    c1s = ColorStats.square(this.c1Stats.mean() - other.c1Stats.mean())
    c2s = ColorStats.square(this.c2Stats.mean() - other.c2Stats.mean())
    c3s = ColorStats.square(this.c3Stats.mean() - other.c3Stats.mean())
    dist = c1s + c2s + c3s
    return dist

IStats

Bases: Interface

statistics interface

Source code in pcwawc/runningstats.py
11
12
13
14
15
16
17
18
19
20
class IStats(Interface):
    """statistics interface"""

    def push(self, value):
        """push a value to the statistics"""
        pass

    def mean(self):
        """get the mean value"""
        pass

mean()

get the mean value

Source code in pcwawc/runningstats.py
18
19
20
def mean(self):
    """get the mean value"""
    pass

push(value)

push a value to the statistics

Source code in pcwawc/runningstats.py
14
15
16
def push(self, value):
    """push a value to the statistics"""
    pass

MinMaxStats

Bases: RunningStats, MinMaxMixin

running statistics with minimum and maximum

Source code in pcwawc/runningstats.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
class MinMaxStats(RunningStats, MinMaxMixin):
    """running statistics with minimum and maximum"""

    def __init__(self):
        super(MinMaxStats, self).__init__()
        super().initMinMax()

    def push(self, value):
        super().push(value)
        super().pushMinMax(value)

    def clear(self):
        super().clear()
        super().initMinMax()

    def formatMinMax(self, formatR="%d: %.1f ± %.1f", formatM=" %.1f - %.1f"):
        text = super().format(formatR)
        if self.n > 0:
            text += super().formatMinMax(formatM)
        return text

MovingAverage

calculate a moving average

Source code in pcwawc/runningstats.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@implementer(IStats)
class MovingAverage:
    """calculate a moving average"""

    def __init__(self, maxlen):
        self.maxlen = maxlen
        self.d = deque(maxlen=maxlen)
        self.sum = 0
        self.n = 0
        self.value = None

    def push(self, value):
        """recalculate the Moving Average based on a new value"""
        self.value = value
        self.sum += value
        if self.n < self.maxlen:
            self.n += 1
        else:
            self.sum -= self.d.popleft()
        self.d.append(value)

    def gradient(self):
        if self.n >= 2:
            g = (self.d[self.n - 1] - self.d[0]) / (self.n - 1)
            return g
        else:
            return 0

    def mean(self):
        if self.n == 0:
            return None
        return self.sum / self.n

    def __str__(self):
        return self.format()

    def format(self, formatM="%.1f"):
        text = formatM % self.mean()
        return text

push(value)

recalculate the Moving Average based on a new value

Source code in pcwawc/runningstats.py
34
35
36
37
38
39
40
41
42
def push(self, value):
    """recalculate the Moving Average based on a new value"""
    self.value = value
    self.sum += value
    if self.n < self.maxlen:
        self.n += 1
    else:
        self.sum -= self.d.popleft()
    self.d.append(value)

RunningStats

calculate mean, variance and standard deviation in one pass using Welford's algorithm

Source code in pcwawc/runningstats.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
@implementer(IStats)
class RunningStats:
    """calculate mean, variance and standard deviation in one pass using Welford's algorithm"""

    def __init__(self):
        self.n = 0
        self.old_m = 0
        self.new_m = 0
        self.old_s = 0
        self.new_s = 0

    def clear(self):
        self.n = 0

    def push(self, xvalue):
        self.n += 1
        x = float(xvalue)

        if self.n == 1:
            self.old_m = self.new_m = x
            self.old_s = 0
        else:
            self.new_m = self.old_m + (x - self.old_m) / self.n
            self.new_s = self.old_s + (x - self.old_m) * (x - self.new_m)

            self.old_m = self.new_m
            self.old_s = self.new_s

    def mean(self):
        return self.new_m if self.n else 0.0

    def variance(self):
        return self.new_s / (self.n - 1) if self.n > 1 else 0.0

    def standard_deviation(self):
        return math.sqrt(self.variance())

    def __str__(self):
        return self.format()

    def format(self, formatS="%d: %.1f ± %.1f"):
        m = self.mean()
        s = self.standard_deviation()
        text = formatS % (self.n, m, s)
        return text

simpledetector

Created on 2019-12-07

@author: tk

ImageChange

detect change of a single image

Source code in pcwawc/simpledetector.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
class ImageChange:
    """detect change of a single image"""

    thresh = 150
    gradientDelta = 0.725
    averageWindow = 4

    def __init__(self):
        self.stats = MinMaxStats()
        self.movingAverage = MovingAverage(ImageChange.averageWindow)
        self.clear()

    def clear(self, newState=ChangeState.CALIBRATING):
        self.cbReferenceBW = None
        self.stats.clear()
        self.changeState = newState
        self.stableCounter = 0

    def transitionToPreMove(self):
        self.changeState = ChangeState.PRE_MOVE
        self.minInMove = self.pixelChanges
        self.maxInMove = self.pixelChanges

    def check(self, cbImage):
        self.makeGray(cbImage)
        self.calcDifference()
        if self.hasReference:
            self.calcPixelChanges()

    def makeGray(self, cbImage):
        self.cbImage = cbImage
        image = cbImage.image
        imageGray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        self.cbImageGray = ChessBoardImage(
            cv2.cvtColor(imageGray, cv2.COLOR_GRAY2BGR), "gray"
        )
        # @TODO Make the treshold 150 configurable
        thresh = ImageChange.thresh
        (thresh, self.imageBW) = cv2.threshold(imageGray, thresh, 255, cv2.THRESH_TRUNC)
        self.cbImageBW = ChessBoardImage(
            cv2.cvtColor(self.imageBW, cv2.COLOR_GRAY2BGR), "bw"
        )

    def calcDifference(self):
        self.updateReference(self.cbImageBW)
        self.cbDiffImage = self.cbImageBW.diffBoardImage(self.cbReferenceBW)

    def updateReference(self, cbImageBW, force=False):
        self.hasReference = not self.cbReferenceBW is None
        if not self.hasReference or force:
            self.cbReferenceBW = cbImageBW

    def calcPixelChanges(self):
        self.pixelChanges = (
            cv2.norm(self.cbImageBW.image, self.cbReferenceBW.image, cv2.NORM_L1)
            / self.cbImageBW.pixels
        )
        self.movingAverage.push(self.pixelChanges)
        self.stats.push(self.pixelChanges)

    def isStable(self):
        self.delta = abs(self.movingAverage.gradient())
        stable = self.delta < ImageChange.gradientDelta
        if stable:
            self.stableCounter += 1
        else:
            self.stableCounter = 0
        return stable

    def __str__(self):
        delta = self.movingAverage.gradient()
        text = "%14s: %5.1f Δ: %5.1f Ø: %s/%s, Σ: %d" % (
            self.changeState.title(),
            self.pixelChanges,
            delta,
            self.movingAverage.format(formatM="%5.1f"),
            self.stats.formatMinMax(
                formatR="%4d: %5.1f ± %5.1f", formatM=" %5.1f - %5.1f"
            ),
            self.cbImageBW.pixels,
        )
        return text

Simple8x8Detector

Bases: SimpleDetector

a simple treshold per field detector

Source code in pcwawc/simpledetector.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
@implementer(IMoveDetector)
class Simple8x8Detector(SimpleDetector):
    """a simple treshold per field detector"""

    # construct me
    def __init__(self):
        super().__init__()

    def setup(self, name, vision):
        super().setup(name, vision)
        self.board = self.vision.board
        self.imageChanges = {}
        for square in self.board.genSquares():
            self.imageChanges[square.an] = ImageChange()

    def onChessBoardImage(self, imageEvent):
        super().onChessBoardImage(imageEvent)
        cbImageSet = imageEvent.cbImageSet
        vision = cbImageSet.vision
        ic = self.imageChange
        cs = ic.changeState
        if vision.warp.warping and cs == ChangeState.PRE_MOVE:
            self.calcChanges(cbImageSet)
            if ic.delta < ImageChange.gradientDelta / 2:
                ic.updateReference(ic.cbImageBW, force=True)

    def calcChanges(self, cbImageSet):
        cbWarped = cbImageSet.cbWarped
        # TODO only do once ...
        self.board.divideInSquares(cbWarped.width, cbWarped.height)
        # calculate pixelChanges per square based on parts of the bigger images created by the super class
        for square in self.board.genSquares():
            ic = self.imageChanges[square.an]
            ic.cbImageBW = ChessBoardImage(
                square.getSquareImage(self.imageChange.cbImageBW), square.an
            )
            ic.updateReference(ic.cbImageBW)
            if ic.hasReference:
                ic.calcPixelChanges()
                # if self.vision.debug:
                # print ("%4d %s: %s" % (cbImageSet.frameIndex,square.an,ic))

    def showDebug(self, limit=6):
        changesByValue = OrderedDict(
            sorted(
                self.imageChanges.items(), key=lambda x: x[1].pixelChanges, reverse=True
            )
        )
        ans = list(changesByValue.keys())[:limit]
        for an in ans:
            ic = self.imageChanges[an]
            print("%s: %s" % (an, ic))
        pass

    def onMoveDetected(self, cbImageSet):
        self.calcChanges(cbImageSet)
        changesByValue = OrderedDict(
            sorted(
                self.imageChanges.items(), key=lambda x: x[1].pixelChanges, reverse=True
            )
        )
        keys = list(changesByValue.keys())
        for changeIndex in range(4):
            change = (keys[0], keys[changeIndex + 1])
            if self.vision.debug:
                print(
                    "frame %4d: potential move for squares %s"
                    % (cbImageSet.frameIndex, str(change))
                )
            move = self.vision.board.changeToMove(change)
            # did we find a move?
            if move is None:
                if self.vision.debug:
                    print("change %s has no valid move" % (str(change)))
            else:
                break
        if self.debug:
            self.showDebug()
        if move is None:
            if self.vision.debug:
                print(
                    "frame %4d: giving up on move detection" % (cbImageSet.frameIndex)
                )
        else:
            super().onMoveDetected(cbImageSet)
            for square in self.board.genSquares():
                ic = self.imageChanges[square.an]
                ic.clear()
            self.fire(move=move)

SimpleDetector

Bases: Observable

a simple treshold detector

Source code in pcwawc/simpledetector.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
@implementer(IMoveDetector)
class SimpleDetector(Observable):
    """a simple treshold detector"""

    calibrationWindow = 3

    def __init__(self):
        """construct me"""
        # make me observable
        super(SimpleDetector, self).__init__()
        self.debug = False
        self.frameDebug = False
        pass

    def setup(self, name, vision):
        self.name = name
        self.vision = vision
        self.imageChange = ImageChange()

    def onChessBoardImage(self, imageEvent):
        cbImageSet = imageEvent.cbImageSet
        vision = cbImageSet.vision
        if vision.warp.warping:
            cbWarped = cbImageSet.cbWarped
            start = timer()
            self.imageChange.check(cbWarped)
            ic = self.imageChange
            endt = timer()
            cbImageSet.cbDebug = cbImageSet.debugImage2x2(
                cbWarped, ic.cbImageGray, ic.cbImageBW, ic.cbDiffImage
            )
            if self.imageChange.hasReference:
                self.updateState(cbImageSet)
                if self.frameDebug:
                    print(
                        "Frame %5d %.3f s:%s"
                        % (cbImageSet.frameIndex, endt - start, ic)
                    )

    def updateState(self, cbImageSet):
        ic = self.imageChange
        ics = ic.changeState
        if ics == ChangeState.CALIBRATING:
            # leave calibrating when enough stable values are available
            if ic.isStable() and ic.stableCounter >= SimpleDetector.calibrationWindow:
                ic.transitionToPreMove()
        elif ics == ChangeState.PRE_MOVE:
            if not ic.isStable():
                ic.changeState = ChangeState.IN_MOVE
            else:
                ic.transitionToPreMove()
        elif ics == ChangeState.IN_MOVE:
            ic.maxInMove = max(ic.maxInMove, ic.pixelChanges)
            peak = ic.maxInMove - ic.minInMove
            dist = ic.pixelChanges - ic.minInMove
            if peak > 0:
                relativePeak = dist / peak
                if ic.isStable():
                    if self.frameDebug:
                        print("%.1f %%" % (relativePeak * 100))
                    # @TODO make configurable
                    if relativePeak < 0.16 or (relativePeak < 0.35 and ic.delta < 0.1):
                        self.onMoveDetected(cbImageSet)

    def onMoveDetected(self, cbImageSet):
        self.imageChange.clear()
        pass

__init__()

construct me

Source code in pcwawc/simpledetector.py
110
111
112
113
114
115
116
def __init__(self):
    """construct me"""
    # make me observable
    super(SimpleDetector, self).__init__()
    self.debug = False
    self.frameDebug = False
    pass

video

Video

Video handling e.g. recording/writing

Source code in pcwawc/video.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
class Video:
    """Video handling e.g. recording/writing"""

    @staticmethod
    def getVideo():
        video = Video()
        video.headless = Environment.inContinuousIntegration()
        return video

    # construct me with no parameters
    def __init__(self, title="frame"):
        self.title = title
        self.cap = None
        self.frames = 0
        self.ispaused = False
        # current Frame
        self.frame = None
        self.processedFrame = None
        self.maxFrames = sys.maxsize
        # still image ass video feature for jpg
        self.autoPause = False
        self.fpsCheck = None
        self.debug = False
        self.headless = False
        pass

    # check whether s is an int
    @staticmethod
    def is_int(s):
        try:
            int(s)
            return True
        except ValueError:
            return False

    @staticmethod
    def title(device):
        if not Video.is_int(device):
            deviceTitle = os.path.basename(device)
        else:
            deviceTitle = "camera %s" % (device)
        return deviceTitle

    # return if video is paused
    def paused(self):
        return self.ispaused

    # pause the video
    def pause(self, ispaused):
        self.ispaused = ispaused

    # capture from the given device
    def capture(self, device):
        if Video.is_int(device):
            self.device = int(device)
        else:
            self.device = device
            self.open(device)
            if device.endswith(".jpg"):
                self.maxFrames = 1
                self.autoPause = True
        self.setup(cv2.VideoCapture(self.device))

    def setup(self, cap):
        """setup the capturing from the given device"""
        self.width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        self.height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        self.fps = int(cap.get(cv2.CAP_PROP_FPS))
        self.cap = cap
        self.fpsCheck = FPSCheck()
        self.fpsCheck.start()

    def checkFilePath(self, filePath, raiseException=True):
        ok = os.path.exists(filePath)
        if raiseException and not ok:
            raise Exception("file %s does not exist" % (filePath))
        return ok

    # capture from the given video filePath
    def open(self, filePath):
        self.checkFilePath(filePath)
        self.setup(cv2.VideoCapture(filePath))

    def showImage(self, image, title: str, keyCheck: bool = True, keyWait: int = 5):
        """
        show the image with the given title

        Args:
            image: the image to show
            title(str): the title of the image
            keyCheck(bool): wait for a a key stroke before continuing?
            keyWait(int): maximum number of seconds to wait for a key stroke
        """
        if not threading.current_thread() is threading.main_thread():
            if self.debug:
                print("can't show image %s since not on mainthread" % (title))
            return True
        if self.headless:
            return True
        cv2.imshow(title, image)
        if keyCheck:
            return not cv2.waitKey(keyWait) & 0xFF == ord("q")
        else:
            return True

    def showAndWriteImage(
        self, image, title, path="/tmp/", imageFormat=".jpg", keyCheck=True, keyWait=5
    ):
        result = self.showImage(image, title, keyCheck, keyWait)
        if image is not None:
            cv2.imshow(title, image)
            cv2.imwrite(path + title + imageFormat, image)
        return result

    # encode the image
    def imencode(self, frame, imgformat=".jpg"):
        # encode the frame in JPEG format
        (flag, encodedImage) = cv2.imencode(imgformat, frame)
        return flag, encodedImage

    # return a video frame as a jpg image
    def readJpgImage(self, show=False, postProcess=None):
        ret, frame, quitWanted = self.readFrame(show, postProcess)
        encodedImage = None
        # ensure the frame was read
        if ret:
            (flag, encodedImage) = self.imencode(frame)
            # ensure the frame was successfully encoded
            if not flag:
                ret = False
        return ret, encodedImage, quitWanted

    # return a video frame as a numpy array
    def readFrame(self, show=False, postProcess=None):
        # when pausing repeat previous frame
        if self.ispaused:
            # simply return the current frame again
            ret = self.frame is not None
        else:
            ret, self.frame = self.cap.read()
        quitWanted = False
        if ret == True:
            if not self.ispaused:
                self.frames = self.frames + 1
                if self.frames >= self.maxFrames and self.autoPause:
                    self.ispaused = True
                self.fpsCheck.update()
            if not postProcess is None:
                try:
                    self.processedFrame = postProcess(self.frame)
                except BaseException as e:
                    # @TODO log exception
                    print("processing error " + str(e))
                    self.processedFrame = self.frame
            else:
                self.processedFrame = self.frame
            if show:
                quitWanted = not self.showImage(self.frame, self.title)
        return ret, self.processedFrame, quitWanted

    # play the given capture
    def play(self):
        while self.cap.isOpened():
            ret, frame, quitWanted = self.readFrame(True)
            if ret == True:
                if quitWanted:
                    break
                if frame is None:
                    # TODO decide whether to log a warning here
                    pass
            else:
                break
        self.close()

    def fileTimeStamp(self):
        return self.timeStamp(separator="_", timeseparator="")

    def timeStamp(self, separator=" ", timeseparator=":"):
        return strftime(
            "%Y-%m-%d" + separator + "%H" + timeseparator + "%M" + timeseparator + "%S"
        )

    def close(self):
        if self.cap is not None:
            self.cap.release()
        cv2.destroyAllWindows()

    def checkCap(self):
        if self.cap is None:
            raise "Capture is not initialized"

    # get a still image
    def still(
        self,
        prefix,
        imgformat="jpg",
        close=True,
        printHints=True,
        show=False,
        postProcess=None,
    ):
        filename = "%s%s.%s" % (prefix, self.fileTimeStamp(), imgformat)
        return self.still2File(
            filename,
            format=format,
            close=close,
            printHints=printHints,
            show=show,
            postProcess=postProcess,
        )

    # get a still image
    def still2File(
        self,
        filename,
        format="jpg",
        close=True,
        printHints=True,
        show=False,
        postProcess=None,
    ):
        self.checkCap()
        ret = False
        frame = None
        if self.cap.isOpened():
            ret, frame, quitWanted = self.readFrame(show, postProcess)
            if ret == True:
                if printHints:
                    print("capture %s with %dx%d" % (filename, self.width, self.height))
                self.writeImage(frame, filename)
            if close:
                self.close()
        return ret, frame

    # read an image
    def readImage(self, filePath):
        self.checkFilePath(filePath)
        image = cv2.imread(filePath, 1)
        return image

    def writeImage(self, image, filepath):
        cv2.imwrite(filepath, image)

    def prepareRecording(self, filename, width, height, fps=None):
        self.checkCap()
        if fps is None:
            fps = self.fps
        # Define the codec and create VideoWriter object
        fourcc = cv2.VideoWriter_fourcc(*"XVID")
        out = cv2.VideoWriter(filename, fourcc, fps, (width, height))
        return out

    # record the capture to a file with the given prefix using a timestamp
    def record(self, prefix, printHints=True, fps=None):
        filename = "%s%s.avi" % (prefix, self.timeStamp())
        out = self.prepareRecording(filename, self.width, self.height, fps)

        if printHints:
            print(
                "recording %s with %dx%d at %d fps press q to stop recording"
                % (filename, self.width, self.height, self.fps)
            )

        while self.cap.isOpened():
            ret, frame, quitWanted = self.readFrame(True)
            if ret == True:
                # flip the frame
                # frame = cv2.flip(frame,0)
                if quitWanted:
                    break
                # write the  frame
                out.write(frame)
            else:
                break

        # Release everything if job is finished
        self.close()
        out.release()
        cv2.destroyAllWindows()
        if printHints:
            print("finished")

    # https://stackoverflow.com/a/22921648/1497139
    def createBlank(self, width, height, rgb_color=(0, 0, 0)):
        """Create new image(numpy array) filled with certain color in RGB"""
        # Create black blank image
        image = self.getEmptyImage4WidthAndHeight(width, height, 3)

        # Since OpenCV uses BGR, convert the color first
        color = tuple(reversed(rgb_color))
        # Fill image with color
        image[:] = color

        return image

    def getEmptyImage4WidthAndHeight(self, w, h, channels):
        """get an empty image with the given width height and channels"""
        emptyImage = np.zeros((h, w, channels), np.uint8)
        return emptyImage

    def getEmptyImage(self, image, channels=1):
        """prepare a trapezoid/polygon mask to focus on the square chess field seen as a trapezoid"""
        h, w = image.shape[:2]
        emptyImage = self.getEmptyImage4WidthAndHeight(w, h, channels)
        return emptyImage

    def maskImage(self, image, mask):
        """return the masked image that filters with the given mask"""
        masked = cv2.bitwise_and(image, image, mask=mask)
        return masked

    # was: http://www.robindavid.fr/opencv-tutorial/chapter5-line-edge-and-contours-detection.html
    # is: https://opencv-python-tutroals.readthedocs.io/en/latest/py_tutorials/py_imgproc/py_houghlines/py_houghlines.html
    # https://docs.opencv.org/3.4/d9/db0/tutorial_hough_lines.html
    def houghTransform(self, image):
        """Performs an Hough Transform to given image.

        Returns: lines"""
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        edges = cv2.Canny(gray, 50, 150, apertureSize=3)
        lines = cv2.HoughLines(edges, 1, np.pi / 180, 200)
        return lines

    def houghTransformP(self, image):
        """Performs a probabilistic Hough Transform to given image.

        Returns: lines"""
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        edges = cv2.Canny(gray, 50, 150, apertureSize=3)
        h, w = image.shape[:2]
        minLineLength = h / 16
        maxLineGap = h / 24
        lines = cv2.HoughLinesP(edges, 1, np.pi / 180, 100, minLineLength, maxLineGap)
        return lines

    def drawTrapezoid(self, image, points, color):
        """loop over the given points and draw them on the image"""
        if points is None:
            return
        prev = None
        # if there is exactly four points then close the loop
        if len(points) == 4:
            points.append(points[0])
        for x, y in points:
            cv2.circle(image, (x, y), 10, color, -1)
            if prev is not None:
                cv2.line(image, (x, y), prev, color, 3, cv2.LINE_AA)
            prev = (x, y)

    def drawCircle(self, image, center, radius=10, color=(0, 255, 0), thickness=1):
        cv2.circle(image, center, radius, color=color, thickness=thickness)

    def drawRectangle(self, image, pt1, pt2, color=(0, 255, 0), thickness=1):
        cv2.rectangle(image, pt1, pt2, color, thickness)

    def drawPolygon(self, image, polygon, color):
        """draw the given polygon onto the given image with the given color"""
        cv2.fillConvexPoly(image, polygon, color)

    #  https://docs.opencv.org/4.1.2/d9/db0/tutorial_hough_lines.html
    def drawLines(self, image, lines):
        height, width = image.shape[:2]
        for i in range(0, len(lines)):
            rho = lines[i][0][0]
            theta = lines[i][0][1]
            a = math.cos(theta)
            b = math.sin(theta)
            x0 = a * rho
            y0 = b * rho
            pt1 = (int(x0 + width * (-b)), int(y0 + height * (a)))
            pt2 = (int(x0 - width * (-b)), int(y0 - height * (a)))
            cv2.line(image, pt1, pt2, (0, 0, 255), 3, cv2.LINE_AA)

    def rotate(self, image, angle, center=None, scale=1.0):
        # grab the dimensions of the image
        (h, w) = image.shape[:2]

        # if the center is None, initialize it as the center of
        # the image
        if center is None:
            center = (w // 2, h // 2)

        # perform the rotation (clockwise)
        M = cv2.getRotationMatrix2D(center, -angle, scale)
        rotated = cv2.warpAffine(image, M, (w, h))

        # return the rotated image
        return rotated

    def warp(self, image, pts, squared=True):
        """apply the four point transform to obtain a birds eye view of the given image"""
        warped = perspective.four_point_transform(image, pts)
        if squared:
            height, width = warped.shape[:2]
            side = min(width, height)
            warped = cv2.resize(warped, (side, side))
        return warped

    def as2x2(self, row1col1, row1col2, row2col1, row2col2, downScale=2):
        height, width = row1col1.shape[:2]
        image1, image2, image3, image4 = row1col1, row1col2, row2col1, row2col2
        if downScale > 1:
            image1 = cv2.resize(image1, (width // downScale, height // downScale))
            image2 = cv2.resize(image2, (width // downScale, height // downScale))
            image3 = cv2.resize(image3, (width // downScale, height // downScale))
            image4 = cv2.resize(image4, (width // downScale, height // downScale))

        combined1 = np.concatenate((image1, image2), axis=0)
        combined2 = np.concatenate((image3, image4), axis=0)
        combined = np.concatenate((combined1, combined2), axis=1)
        return combined

    @staticmethod
    def getSubRect(image, rect):
        x, y, w, h = rect
        return image[y : y + h, x : x + w]

    # get the intensity sum of a hsv image
    def sumIntensity(self, image):
        h, s, v = cv2.split(image)
        height, width = image.shape[:2]
        sumResult = np.sum(v)
        return sumResult

    # add a timeStamp to the given frame fontScale 1.0
    def addTimeStamp(
        self,
        frame,
        withFrames=True,
        withFPS=True,
        fontBGRColor=(0, 255, 0),
        fontScale=1.0,
        font=cv2.FONT_HERSHEY_SIMPLEX,
        lineThickness=1,
    ):
        if frame is not None:
            height, width = frame.shape[:2]
            # grab the current time stamp and draw it on the frame
            now = self.timeStamp()
            if withFrames:
                now = now + " %d" % (self.frames)
            if withFPS and self.fpsCheck is not None:
                now = now + "@%.0f fps" % (self.fpsCheck.fps())
            fontFactor = width / 960
            text_width, text_height = cv2.getTextSize(
                now, font, fontScale * fontFactor, lineThickness
            )[0]
            # https://stackoverflow.com/a/34273603/1497139
            # frame = frame.copy()
            self.drawText(
                frame,
                now,
                (width - int(text_width * 1.1), int(text_height * 1.2)),
                font,
                fontScale * fontFactor,
                fontBGRColor,
                lineThickness,
            )
        return frame

    def drawCenteredText(
        self,
        frame,
        text,
        x,
        y,
        fontBGRColor=(0, 255, 0),
        fontScale=1.0,
        font=cv2.FONT_HERSHEY_SIMPLEX,
        lineThickness=1,
    ):
        height, width = frame.shape[:2]
        fontFactor = width / 960
        text_width, text_height = cv2.getTextSize(
            text, font, fontScale * fontFactor, lineThickness
        )[0]
        self.drawText(
            frame,
            text,
            (x - text_width // 2, y + text_height // 2),
            font,
            fontScale * fontFactor,
            fontBGRColor,
            lineThickness,
        )

    def drawText(
        self,
        frame,
        text,
        bottomLeftCornerOfText,
        font,
        fontScale,
        fontBGRColor,
        lineThickness,
    ):
        cv2.putText(
            frame,
            text,
            bottomLeftCornerOfText,
            font,
            fontScale,
            fontBGRColor,
            lineThickness,
        )

createBlank(width, height, rgb_color=(0, 0, 0))

Create new image(numpy array) filled with certain color in RGB

Source code in pcwawc/video.py
302
303
304
305
306
307
308
309
310
311
312
def createBlank(self, width, height, rgb_color=(0, 0, 0)):
    """Create new image(numpy array) filled with certain color in RGB"""
    # Create black blank image
    image = self.getEmptyImage4WidthAndHeight(width, height, 3)

    # Since OpenCV uses BGR, convert the color first
    color = tuple(reversed(rgb_color))
    # Fill image with color
    image[:] = color

    return image

drawPolygon(image, polygon, color)

draw the given polygon onto the given image with the given color

Source code in pcwawc/video.py
374
375
376
def drawPolygon(self, image, polygon, color):
    """draw the given polygon onto the given image with the given color"""
    cv2.fillConvexPoly(image, polygon, color)

drawTrapezoid(image, points, color)

loop over the given points and draw them on the image

Source code in pcwawc/video.py
354
355
356
357
358
359
360
361
362
363
364
365
366
def drawTrapezoid(self, image, points, color):
    """loop over the given points and draw them on the image"""
    if points is None:
        return
    prev = None
    # if there is exactly four points then close the loop
    if len(points) == 4:
        points.append(points[0])
    for x, y in points:
        cv2.circle(image, (x, y), 10, color, -1)
        if prev is not None:
            cv2.line(image, (x, y), prev, color, 3, cv2.LINE_AA)
        prev = (x, y)

getEmptyImage(image, channels=1)

prepare a trapezoid/polygon mask to focus on the square chess field seen as a trapezoid

Source code in pcwawc/video.py
319
320
321
322
323
def getEmptyImage(self, image, channels=1):
    """prepare a trapezoid/polygon mask to focus on the square chess field seen as a trapezoid"""
    h, w = image.shape[:2]
    emptyImage = self.getEmptyImage4WidthAndHeight(w, h, channels)
    return emptyImage

getEmptyImage4WidthAndHeight(w, h, channels)

get an empty image with the given width height and channels

Source code in pcwawc/video.py
314
315
316
317
def getEmptyImage4WidthAndHeight(self, w, h, channels):
    """get an empty image with the given width height and channels"""
    emptyImage = np.zeros((h, w, channels), np.uint8)
    return emptyImage

houghTransform(image)

Performs an Hough Transform to given image.

Returns: lines

Source code in pcwawc/video.py
333
334
335
336
337
338
339
340
def houghTransform(self, image):
    """Performs an Hough Transform to given image.

    Returns: lines"""
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    edges = cv2.Canny(gray, 50, 150, apertureSize=3)
    lines = cv2.HoughLines(edges, 1, np.pi / 180, 200)
    return lines

houghTransformP(image)

Performs a probabilistic Hough Transform to given image.

Returns: lines

Source code in pcwawc/video.py
342
343
344
345
346
347
348
349
350
351
352
def houghTransformP(self, image):
    """Performs a probabilistic Hough Transform to given image.

    Returns: lines"""
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    edges = cv2.Canny(gray, 50, 150, apertureSize=3)
    h, w = image.shape[:2]
    minLineLength = h / 16
    maxLineGap = h / 24
    lines = cv2.HoughLinesP(edges, 1, np.pi / 180, 100, minLineLength, maxLineGap)
    return lines

maskImage(image, mask)

return the masked image that filters with the given mask

Source code in pcwawc/video.py
325
326
327
328
def maskImage(self, image, mask):
    """return the masked image that filters with the given mask"""
    masked = cv2.bitwise_and(image, image, mask=mask)
    return masked

setup(cap)

setup the capturing from the given device

Source code in pcwawc/video.py
82
83
84
85
86
87
88
89
def setup(self, cap):
    """setup the capturing from the given device"""
    self.width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    self.height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    self.fps = int(cap.get(cv2.CAP_PROP_FPS))
    self.cap = cap
    self.fpsCheck = FPSCheck()
    self.fpsCheck.start()

showImage(image, title, keyCheck=True, keyWait=5)

show the image with the given title

Parameters:

Name Type Description Default
image

the image to show

required
title(str)

the title of the image

required
keyCheck(bool)

wait for a a key stroke before continuing?

required
keyWait(int)

maximum number of seconds to wait for a key stroke

required
Source code in pcwawc/video.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def showImage(self, image, title: str, keyCheck: bool = True, keyWait: int = 5):
    """
    show the image with the given title

    Args:
        image: the image to show
        title(str): the title of the image
        keyCheck(bool): wait for a a key stroke before continuing?
        keyWait(int): maximum number of seconds to wait for a key stroke
    """
    if not threading.current_thread() is threading.main_thread():
        if self.debug:
            print("can't show image %s since not on mainthread" % (title))
        return True
    if self.headless:
        return True
    cv2.imshow(title, image)
    if keyCheck:
        return not cv2.waitKey(keyWait) & 0xFF == ord("q")
    else:
        return True

warp(image, pts, squared=True)

apply the four point transform to obtain a birds eye view of the given image

Source code in pcwawc/video.py
408
409
410
411
412
413
414
415
def warp(self, image, pts, squared=True):
    """apply the four point transform to obtain a birds eye view of the given image"""
    warped = perspective.four_point_transform(image, pts)
    if squared:
        height, width = warped.shape[:2]
        side = min(width, height)
        warped = cv2.resize(warped, (side, side))
    return warped

VideoStream

Bases: object

run videograbbing in separate stream

Source code in pcwawc/video.py
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
class VideoStream(object):
    """run videograbbing in separate stream"""

    def __init__(self, video, show=False, postProcess=None, name="VideoStream"):
        self.video = video
        self.show = show
        self.quit = False
        self.frame = None
        # initialize the thread name
        self.name = name
        self.postProcess = postProcess

        # initialize the variable used to indicate if the thread should
        # be stopped
        self.stopped = False

    def start(self):
        # start the thread to read frames from the video stream
        t = Thread(target=self.update, name=self.name, args=())
        t.daemon = True
        t.start()
        return self

    def update(self):
        # keep looping infinitely until the thread is stopped
        while True:
            # if the thread indicator variable is set, stop the thread
            if self.stopped:
                return

            ret, frame, quitWanted = video.readFrame(self.show, self.postProcess)
            if quitWanted:
                return

            if ret:
                self.frame = frame

    def read(self):
        # return the frame most recently read
        return self.frame

    def stop(self):
        # indicate that the thread should be stopped
        self.stopped = True

videoanalyze

Created on 2019-12-08

@author: wf

VideoAnalyzer

Bases: Observable

analyzer for chessboard videos - may be used from command line or web app

Source code in pcwawc/videoanalyze.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
class VideoAnalyzer(Observable):
    """analyzer for chessboard videos - may be used from command line or web app"""

    def __init__(self, args, vision=None, logger=None):
        super(VideoAnalyzer, self).__init__()
        if vision is None:
            self.vision = ChessBoardVision(args)
        else:
            self.vision = vision
        self.logger = logger
        self.args = args
        self.debug = args.debug
        if self.debug:
            self.log("Warp: %s" % (args.warpPointList))

        # not recording
        self.videopath = None
        self.videoout = None

    def open(self):
        self.vision.open(self.args.input)
        video = self.vision.video
        if self.args.startframe > 0:
            if self.debug:
                print("skipping first %d frames" % (self.args.startframe))
            while video.frames <= self.args.startframe:
                self.vision.video.readFrame()

    def close(self):
        if self.videoout is not None:
            self.stopVideoRecording()
        self.vision.close()

    def hasImage(self):
        return self.vision.hasImage

    def hasImageSet(self):
        return self.cbImageSet is not None

    def isRecording(self):
        return self.videoout is not None

    def startVideoRecording(self, path, filename):
        self.open()
        self.videofilename = filename
        # make sure the path exists
        Environment.checkDir(path)
        self.videopath = path + self.videofilename
        return filename

    def stopVideoRecording(self):
        self.videoout.release()
        self.videopath = None
        self.videoout = None
        return self.videofilename

    def videoPause(self):
        ispaused = not self.vision.video.paused()
        self.vision.video.pause(ispaused)
        return ispaused

    def analyze(self):
        self.open()
        if self.args.autowarp:
            self.autoWarp()
        while True:
            cbImageSet = self.nextImageSet()
            if cbImageSet is None:
                break
            if self.debug:
                self.vision.video.showImage(cbImageSet.debugImage().image, "debug")
        pgn = self.vision.board.game.pgn
        self.close()
        return pgn

    def nextImageSet(self):
        self.cbImageSet = self.vision.readChessBoardImage()
        if not self.vision.hasImage:
            return None
        self.processImageSet(self.cbImageSet)
        return self.cbImageSet

    def processImageSet(self, cbImageSet):
        cbImageSet.warpAndRotate(self.args.nowarp)
        # analyze the board if warping is active
        self.fire(cbImageSet=cbImageSet)
        cbImageSet.prepareGUI()
        # do we need to record?
        if self.videopath is not None:
            cbWarped = cbImageSet.cbWarped
            # is the output open?
            if self.videoout is None:
                # create correctly sized output
                video = self.vision.video
                self.videoout = video.prepareRecording(
                    self.videopath, cbWarped.width, cbWarped.height
                )

            self.videoout.write(cbWarped.image)
            self.log("wrote frame %d to recording " % (self.vision.video.frames))
        return

    def findChessBoard(self):
        return self.findTheChessBoard(self.vision.video.frame, self.vision.video)

    def findTheChessBoard(self, image, video):
        finder = BoardFinder(image, video=video)
        corners = finder.findOuterCorners()
        # @FIXME - use property title and frame count instead
        title = "corners_%s.jpg" % (video.fileTimeStamp())
        histograms = finder.getHistograms(image, title, corners)
        finder.expand(image, title, histograms, corners)
        if self.debug:
            corners.showDebug(image, title)
            finder.showPolygonDebug(image, title, corners)
            finder.showHistogramDebug(histograms, title, corners)
        trapez = corners.trapez8x8
        self.vision.warp.pointList = trapez.tolist()
        self.vision.warp.updatePoints()
        return corners

    def log(self, msg):
        if self.debug:
            if self.logger is not None:
                self.logger.info(msg)
            else:
                print(msg)

    def setDebug(self, debug):
        self.debug = debug
        BoardFinder.debug = debug
        Corners.debug = debug
        self.vision.debug = debug
        if self.moveDetector is not None:
            self.moveDetector.debug = debug

    def onMove(self, event):
        move = event.move
        san = self.vision.board.move(move)
        if san is None:
            if self.debug:
                print("invalid move %s" % (str(move)))

    def autoWarp(self):
        self.nextImageSet()
        self.findChessBoard()

    def setUpDetector(self):
        self.moveDetector = MoveDetectorFactory.create(self.args.detector, self.vision)
        self.subscribe(self.moveDetector.onChessBoardImage)
        self.moveDetector.subscribe(self.onMove)

    def changeDetector(self, newDetector):
        self.unsubscribe(self.moveDetector.onChessBoardImage)
        self.moveDetector = newDetector
        self.subscribe(self.moveDetector.onChessBoardImage)

    @staticmethod
    def fromArgs(argv):
        cmdLineArgs = Args("Chessboard Video analyzer")
        args = cmdLineArgs.parse(argv)
        videoAnalyzer = VideoAnalyzer(args)
        videoAnalyzer.setUpDetector()
        videoAnalyzer.setDebug(args.debug)
        return videoAnalyzer

webapp

WebApp

actual Play Chess with a WebCam Application - Flask calls are routed here

Source code in pcwawc/webapp.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
class WebApp:
    """actual Play Chess with a WebCam Application - Flask calls are routed here"""

    debug = False

    # construct me with the given settings
    def __init__(self, args, logger=None):
        """construct me"""
        self.args = args
        self.videoStream = None
        self.videoAnalyzer = VideoAnalyzer(args, logger=logger)
        self.videoAnalyzer.setUpDetector()
        self.board = self.videoAnalyzer.vision.board
        self.setDebug(args.debug)
        self.env = Environment()

    def log(self, msg):
        self.videoAnalyzer.log(msg)

    # return the index.html template content with the given message
    def index(self, msg):
        self.log(msg)
        game = self.board.game
        game.warp = self.videoAnalyzer.vision.warp
        game.save()
        gameid = game.gameid
        return render_template(
            "index.html",
            detector=self.videoAnalyzer.moveDetector,
            detectors=MoveDetectorFactory.detectors,
            message=msg,
            timeStamp=self.videoAnalyzer.vision.video.timeStamp(),
            gameid=gameid,
        )

    def home(self):
        self.videoAnalyzer.vision.video = Video()
        return self.index("Home")

    def photoDownload(self, path, filename):
        #  https://stackoverflow.com/a/24578372/1497139
        return send_from_directory(directory=path, filename=filename)

    def setDebug(self, debug):
        WebApp.debug = debug
        self.videoAnalyzer.setDebug(debug)

    # toggle the debug flag
    def chessDebug(self):
        self.setDebug(not WebApp.debug)
        msg = "debug " + ("on" if WebApp.debug else "off")
        return self.index(msg)

    # automatically find the chess board
    def chessFindBoard(self):
        if self.videoAnalyzer.hasImage():
            try:
                corners = self.videoAnalyzer.findChessBoard()
                msg = "%dx%d found" % (corners.rows, corners.cols)
            except Exception as e:
                msg = str(e)
        else:
            msg = "can't find chess board - video is not active"
        return self.index(msg)

    def chessTakeback(self):
        try:
            msg = "take back"
            if not self.board.takeback():
                msg = "can not take back any more moves"
            if WebApp.debug:
                self.game.showDebug()
            return self.index(msg)
        except BaseException as e:
            return self.indexException(e)

    def chessSave(self):
        gameid = self.board.lockGame()
        msg = "chess game <a href='/chess/games/%s'>%s</a> saved(locked)" % (
            gameid,
            gameid,
        )
        return self.index(msg)

    def chessGameColors(self):
        msg = "color update in progress"
        return self.index(msg)

    def chessForward(self):
        msg = "forward"
        return self.index(msg)

    def indexException(self, e):
        msg = "<span style='color:red'>%s</span>" % str(e)
        return self.index(msg)

    def chessMove(self, move):
        try:
            if "-" in move:
                move = move.replace("-", "")
            self.board.ucimove(move)
            msg = "move %s -> fen= %s" % (move, self.board.fen)
            if WebApp.debug:
                self.game.showDebug()
            return self.index(msg)
        except BaseException as e:
            return self.indexException(e)

    def timeStamp(self):
        return datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")

    def chessGameState(self, gameid):
        fen = self.board.fen
        pgn = self.board.game.pgn
        gameStateJson = jsonify(
            fen=fen,
            pgn=pgn,
            gameid=gameid,
            debug=WebApp.debug,
            timestamp=self.timeStamp(),
        )
        # if WebApp.debug:
        #    self.log(gameStateJson)
        return gameStateJson

    def chessSettings(self, args):
        msg = "settings"
        if "detector" in args:
            newDetectorName = args["detector"]
            newDetector = MoveDetectorFactory.create(
                newDetectorName, self.videoAnalyzer.vision
            )
            self.videoAnalyzer.changeDetector(newDetector)
            msg = "changing detector to %s" % (newDetectorName)
        return self.index(msg)

    def chessFEN(self, fen):
        msg = fen
        try:
            self.board.updatePieces(fen)
            msg = "game update from fen %s" % (fen)
            return self.index(msg)
        except BaseException as e:
            return self.indexException(e)

    def chessPgn(self, pgn):
        try:
            self.board.setPgn(pgn)
            msg = "game updated from pgn"
            return self.index(msg)
        except BaseException as e:
            return self.indexException(e)

    # picture has been clicked
    def chessWebCamClick(self, x, y, w, h):
        video = self.videoAnalyzer.vision.video
        if not self.videoAnalyzer.hasImage():
            msg = "no video available"
        else:
            px = x * video.width // w
            py = y * video.height // h
            b, g, r = video.frame[py, px]
            colorInfo = "r:%x g:%x b:%x" % (r, g, b)
            warp = self.videoAnalyzer.vision.warp
            warp.addPoint(px, py)
            msg = (
                "clicked warppoint %d pixel %d,%d %s mouseclick %d,%d in image %d x %d"
                % (len(warp.pointList), px, py, colorInfo, x, y, w, h)
            )
            return self.index(msg)

    def photo(self, path):
        try:
            if self.videoAnalyzer.hasImageSet():
                # make sure the path exists
                Environment.checkDir(path)
                video = self.videoAnalyzer.vision.video
                filename = "chessboard_%s.jpg" % (video.fileTimeStamp())
                video.writeImage(
                    self.videoAnalyzer.cbImageSet.cbGUI.image, path + filename
                )
                msg = "still image <a href='/photo/%s'>%s</a> taken from input %s" % (
                    filename,
                    filename,
                    self.args.input,
                )
            else:
                msg = "no imageset for photo available"
            return self.index(msg)
        except BaseException as e:
            return self.indexException(e)

    def videoRecord(self, path):
        if not self.videoAnalyzer.isRecording():
            video = self.videoAnalyzer.vision.video
            filename = self.videoAnalyzer.startVideoRecording(
                path, "chessgame_%s.avi" % (video.fileTimeStamp())
            )
            msg = "started recording %s" % (filename)
        else:
            filename = self.videoAnalyzer.stopVideoRecording()
            msg = "finished recording " + filename
        return self.index(msg)

    def videoRotate90(self):
        try:
            warp = self.videoAnalyzer.vision.warp
            warp.rotate(90)
            msg = "rotation: %d°" % (warp.rotation)
            return self.index(msg)
        except BaseException as e:
            return self.indexException(e)

    def videoPause(self):
        ispaused = self.videoAnalyzer.videoPause()
        msg = "video " + ("paused" if ispaused else "running")
        return self.index(msg)

    def videoFeed(self):
        self.videoAnalyzer.open()
        # return the response generated along with the specific media
        # type (mime type)
        return Response(
            self.genVideo(self.videoAnalyzer),
            mimetype="multipart/x-mixed-replace; boundary=frame",
        )

    # https://html.spec.whatwg.org/multipage/server-sent-events.html
    # https://stackoverflow.com/a/51969441/1497139
    def getEvent(self):
        """this could be any function that blocks until data is ready"""
        time.sleep(1.0)
        s = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
        return s

    def eventFeed(self):
        return Response(self.genEventStream(), mimetype="text/event-stream")

    def genEventStream(self):
        while True:
            # wait for source data to be available, then push it
            yield "data: {}\n\n".format(self.getEvent())

    # streamed video generator
    # @TODO fix this non working code
    # def genVideoStreamed(self, video):
    #    if self.videoStream is None:
    #        self.videoStream = VideoStream(self.video, show=True, postProcess=self.video.addTimeStamp)
    #        self.videoStream.start()
    #    while True:
    #        frame = self.videoStream.read()
    #        if frame is not None:
    #            flag, encodedImage = self.video.imencode(frame)
    #            # ensure we got a valid image
    #            if not flag:
    #                continue
    #            # yield the output frame in the byte format
    #            yield(b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' +
    #                  bytearray(encodedImage) + b'\r\n')

    # video generator
    def genVideo(self, analyzer):
        if self.args.autowarp:
            analyzer.autoWarp()
        while True:
            cbImageSet = analyzer.nextImageSet()
            if cbImageSet is None:
                break
            guiImage = cbImageSet.cbGUI.image
            if guiImage is not None:
                (flag, encodedImage) = analyzer.vision.video.imencode(guiImage)
                if not flag:
                    self.log("encoding failed")
                else:
                    # yield the output frame in the byte format
                    yield (
                        b"--frame\r\n"
                        b"Content-Type: image/jpeg\r\n\r\n"
                        + bytearray(encodedImage)
                        + b"\r\n"
                    )

__init__(args, logger=None)

construct me

Source code in pcwawc/webapp.py
20
21
22
23
24
25
26
27
28
def __init__(self, args, logger=None):
    """construct me"""
    self.args = args
    self.videoStream = None
    self.videoAnalyzer = VideoAnalyzer(args, logger=logger)
    self.videoAnalyzer.setUpDetector()
    self.board = self.videoAnalyzer.vision.board
    self.setDebug(args.debug)
    self.env = Environment()

getEvent()

this could be any function that blocks until data is ready

Source code in pcwawc/webapp.py
243
244
245
246
247
def getEvent(self):
    """this could be any function that blocks until data is ready"""
    time.sleep(1.0)
    s = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
    return s

webchesscam

WebChessCamArgs

Bases: Args

This class parses command line arguments and generates a usage.

Source code in pcwawc/webchesscam.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
class WebChessCamArgs(Args):
    """This class parses command line arguments and generates a usage."""

    def __init__(self, argv):
        super().__init__(description="WebChessCam")
        self.parser.add_argument(
            "--port", type=int, default="5003", help="port to run server at"
        )

        self.parser.add_argument(
            "--host", default="0.0.0.0", help="host to allow access for"
        )

        self.args = self.parse(argv)

chessUpdate()

set game status from the given pgn, fen or move

Source code in pcwawc/webchesscam.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
@app.route("/chess/update", methods=["GET"])
def chessUpdate():
    """set game status from the given pgn, fen or move"""
    updateGame = request.args.get("updateGame")
    updateFEN = request.args.get("updateFEN")
    updateMove = request.args.get("updateMove")
    pgn = request.args.get("pgn")
    fen = request.args.get("fen")
    move = request.args.get("move")
    if updateFEN is not None:
        return webApp.chessFEN(fen)
    elif updateGame is not None:
        return webApp.chessPgn(pgn)
    elif updateMove is not None:
        return webApp.chessMove(move)
    else:
        return webApp.index(
            "expected updateGame,updateFEN or updateMove but no such request found"
        )

yamlablemixin

YamlAbleMixin

Bases: object

allow reading and writing derived objects from a yaml file

Source code in pcwawc/yamlablemixin.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class YamlAbleMixin(object):
    """allow reading and writing derived objects from a yaml file"""

    debug = False

    # read me from a yaml file
    @staticmethod
    def readYaml(name):
        yamlFile = name
        if not yamlFile.endswith(".yaml"):
            yamlFile = yamlFile + ".yaml"
        # is there a yamlFile for the given name
        if os.path.isfile(yamlFile):
            with io.open(yamlFile, "r") as stream:
                if YamlAbleMixin.debug:
                    print("reading %s" % (yamlFile))
                result = yaml.load(stream, Loader=yaml.Loader)
                if YamlAbleMixin.debug:
                    print(result)
                return result
        else:
            return None

    # write me to my yaml file
    def writeYaml(self, name):
        yamlFile = name
        if not yamlFile.endswith(".yaml"):
            yamlFile = yamlFile + ".yaml"
        with io.open(yamlFile, "w", encoding="utf-8") as stream:
            yaml.dump(self, stream)
            if YamlAbleMixin.debug:
                print(yaml.dump(self))