File size: 5,172 Bytes
7e93a0e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import argparse

import cv2
import numpy as np

try:
    from imwatermark import WatermarkDecoder
except ImportError as e:
    try:
        # Assume some of the other dependencies such as torch are not fulfilled
        # import file without loading unnecessary libraries.
        import importlib.util
        import sys

        spec = importlib.util.find_spec("imwatermark.maxDct")
        assert spec is not None
        maxDct = importlib.util.module_from_spec(spec)
        sys.modules["maxDct"] = maxDct
        spec.loader.exec_module(maxDct)

        class WatermarkDecoder(object):
            """A minimal version of
            https://github.com/ShieldMnt/invisible-watermark/blob/main/imwatermark/watermark.py
            to only reconstruct bits using dwtDct"""

            def __init__(self, wm_type="bytes", length=0):
                assert wm_type == "bits", "Only bits defined in minimal import"
                self._wmType = wm_type
                self._wmLen = length

            def reconstruct(self, bits):
                if len(bits) != self._wmLen:
                    raise RuntimeError("bits are not matched with watermark length")

                return bits

            def decode(self, cv2Image, method="dwtDct", **configs):
                (r, c, channels) = cv2Image.shape
                if r * c < 256 * 256:
                    raise RuntimeError("image too small, should be larger than 256x256")

                bits = []
                assert method == "dwtDct"
                embed = maxDct.EmbedMaxDct(watermarks=[], wmLen=self._wmLen, **configs)
                bits = embed.decode(cv2Image)
                return self.reconstruct(bits)

    except:
        raise e


# A fixed 48-bit message that was choosen at random
# WATERMARK_MESSAGE = 0xB3EC907BB19E
WATERMARK_MESSAGE = 0b101100111110110010010000011110111011000110011110
# bin(x)[2:] gives bits of x as str, use int to convert them to 0/1
WATERMARK_BITS = [int(bit) for bit in bin(WATERMARK_MESSAGE)[2:]]
MATCH_VALUES = [
    [27, "No watermark detected"],
    [33, "Partial watermark match. Cannot determine with certainty."],
    [
        35,
        (
            "Likely watermarked. In our test 0.02% of real images were "
            'falsely detected as "Likely watermarked"'
        ),
    ],
    [
        49,
        (
            "Very likely watermarked. In our test no real images were "
            'falsely detected as "Very likely watermarked"'
        ),
    ],
]


class GetWatermarkMatch:
    def __init__(self, watermark):
        self.watermark = watermark
        self.num_bits = len(self.watermark)
        self.decoder = WatermarkDecoder("bits", self.num_bits)

    def __call__(self, x: np.ndarray) -> np.ndarray:
        """
        Detects the number of matching bits the predefined watermark with one
        or multiple images. Images should be in cv2 format, e.g. h x w x c BGR.

        Args:
            x: ([B], h w, c) in range [0, 255]

        Returns:
           number of matched bits ([B],)
        """
        squeeze = len(x.shape) == 3
        if squeeze:
            x = x[None, ...]

        bs = x.shape[0]
        detected = np.empty((bs, self.num_bits), dtype=bool)
        for k in range(bs):
            detected[k] = self.decoder.decode(x[k], "dwtDct")
        result = np.sum(detected == self.watermark, axis=-1)
        if squeeze:
            return result[0]
        else:
            return result


get_watermark_match = GetWatermarkMatch(WATERMARK_BITS)


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "filename",
        nargs="+",
        type=str,
        help="Image files to check for watermarks",
    )
    opts = parser.parse_args()

    print(
        """
        This script tries to detect watermarked images. Please be aware of
        the following:
        - As the watermark is supposed to be invisible, there is the risk that
          watermarked images may not be detected.
        - To maximize the chance of detection make sure that the image has the same
          dimensions as when the watermark was applied (most likely 1024x1024
          or 512x512).
        - Specific image manipulation may drastically decrease the chance that
          watermarks can be detected.
        - There is also the chance that an image has the characteristics of the
          watermark by chance.
        - The watermark script is public, anybody may watermark any images, and
          could therefore claim it to be generated.
        - All numbers below are based on a test using 10,000 images without any
          modifications after applying the watermark.
        """
    )

    for fn in opts.filename:
        image = cv2.imread(fn)
        if image is None:
            print(f"Couldn't read {fn}. Skipping")
            continue

        num_bits = get_watermark_match(image)
        k = 0
        while num_bits > MATCH_VALUES[k][0]:
            k += 1
        print(
            f"{fn}: {MATCH_VALUES[k][1]}",
            f"Bits that matched the watermark {num_bits} from {len(WATERMARK_BITS)}\n",
            sep="\n\t",
        )