Spaces:
Running
Running
from flask import Flask, request, jsonify | |
import base64 | |
import io | |
from PIL import Image | |
import cv2 | |
import numpy as np | |
import easyocr | |
import logging | |
from routes import app | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Initialize EasyOCR Reader | |
reader = easyocr.Reader(['en'], gpu=False) | |
def run_length_decode(encoded_bytes): | |
if len(encoded_bytes) % 2 != 0: | |
logger.error("Run-length encoded data length is not even.") | |
decoded = bytearray() | |
for i in range(0, len(encoded_bytes), 2): | |
count = encoded_bytes[i] | |
byte = encoded_bytes[i + 1] | |
decoded.extend([byte] * count) | |
return bytes(decoded) | |
def remove_green_lines(cv_image): | |
hsv = cv2.cvtColor(cv_image, cv2.COLOR_BGR2HSV) | |
lower_green = np.array([40, 40, 40]) | |
upper_green = np.array([80, 255, 255]) | |
mask = cv2.inRange(hsv, lower_green, upper_green) | |
mask_inv = cv2.bitwise_not(mask) | |
img_no_green = cv2.bitwise_and(cv_image, cv_image, mask=mask_inv) | |
return img_no_green | |
def preprocess_image(cv_image): | |
gray = cv2.cvtColor(cv_image, cv2.COLOR_BGR2GRAY) | |
blurred = cv2.GaussianBlur(gray, (7, 7), 0) | |
thresh = cv2.adaptiveThreshold( | |
blurred, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, | |
cv2.THRESH_BINARY, blockSize=15, C=3) | |
thresh = cv2.bitwise_not(thresh) | |
kernel = np.ones((5,5), np.uint8) | |
thresh = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel) | |
return thresh | |
def extract_sudoku_board(image, emptyCells, image_size): | |
cv_image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) | |
cv_image = remove_green_lines(cv_image) | |
thresh = preprocess_image(cv_image) | |
contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
largest_contour = max(contours, key=cv2.contourArea) | |
peri = cv2.arcLength(largest_contour, True) | |
epsilon = 0.01 * peri | |
approx = cv2.approxPolyDP(largest_contour, epsilon, True) | |
pts = approx.reshape(4, 2) | |
rect = np.zeros((4, 2), dtype="float32") | |
s = pts.sum(axis=1) | |
rect[0] = pts[np.argmin(s)] | |
rect[2] = pts[np.argmax(s)] | |
diff = np.diff(pts, axis=1) | |
rect[1] = pts[np.argmin(diff)] | |
rect[3] = pts[np.argmax(diff)] | |
(tl, tr, br, bl) = rect | |
widthA = np.linalg.norm(br - bl) | |
widthB = np.linalg.norm(tr - tl) | |
maxWidth = max(int(widthA), int(widthB)) | |
heightA = np.linalg.norm(tr - br) | |
heightB = np.linalg.norm(tl - bl) | |
maxHeight = max(int(heightA), int(heightB)) | |
dst = np.array([ | |
[0, 0], | |
[maxWidth - 1, 0], | |
[maxWidth - 1, maxHeight - 1], | |
[0, maxHeight - 1] | |
], dtype="float32") | |
M = cv2.getPerspectiveTransform(rect, dst) | |
warp = cv2.warpPerspective(cv_image, M, (maxWidth, maxHeight)) | |
# Determine Sudoku size based on image dimensions and emptyCells | |
size = 4 # Default to 4x4 | |
if image_size > 40000: | |
if image_size < 100000: | |
size = 9 | |
else: | |
size = 16 | |
# Secondary check based on emptyCells | |
empty_cell_check = any(cell.get('x', 0) > 9 or cell.get('y', 0) > 9 for cell in emptyCells) | |
if empty_cell_check: | |
size = 16 | |
# Calculate cell size | |
cell_size = maxWidth // size | |
# Resize the warped grid to the expected size | |
expected_size = size * cell_size | |
warp = cv2.resize(warp, (expected_size, expected_size)) | |
# Convert warped image to grayscale and threshold | |
warp_gray = cv2.cvtColor(warp, cv2.COLOR_BGR2GRAY) | |
warp_blurred = cv2.GaussianBlur(warp_gray, (5, 5), 0) | |
warp_thresh = cv2.adaptiveThreshold( | |
warp_blurred, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, | |
cv2.THRESH_BINARY, 11, 2) | |
warp_thresh = cv2.bitwise_not(warp_thresh) | |
board = [] | |
for y in range(size): | |
row = [] | |
for x in range(size): | |
start_x = x * cell_size | |
start_y = y * cell_size | |
end_x = (x + 1) * cell_size | |
end_y = (y + 1) * cell_size | |
cell = warp_thresh[start_y:end_y, start_x:end_x] | |
digit = recognize_digit(cell, size) | |
row.append(digit) | |
board.append(row) | |
return board, size | |
def count_connected_black_pixels(image, min_pixels=14): | |
# Label the connected components in the binary image | |
image = cv2.resize(image, (100, 100)) | |
num_labels, labels_im = cv2.connectedComponents(image, connectivity=8) | |
# Save Image for inspection in img folder with randome name | |
# cv2.imwrite("img/img" + str(np.random.randint(0, 1000)) + ".jpg", image) | |
# Subtract 1 to ignore the background label (label=0 is the background) | |
valid_components = 0 | |
# Iterate over all the labels and count those with at least min_pixels pixels | |
for label in range(0, num_labels): | |
if np.sum(labels_im == label) >= min_pixels: | |
valid_components += 1 | |
valid_components = valid_components - 2 | |
if valid_components < 0: | |
valid_components = 0 | |
return valid_components | |
# Test the function on a preprocessed image | |
def recognize_digit(cell_image, slots = 4): | |
""" | |
Recognizes the digit in a Sudoku cell. If no digit is detected, it counts black pixel groups (dots). | |
""" | |
# Preprocess cell image for digit detection | |
if slots == 4: | |
cell_resized = cv2.resize(cell_image, (100, 100)) | |
elif slots == 9: | |
cell_resized = cv2.resize(cell_image, (300, 300)) | |
else: | |
cell_resized = cv2.resize(cell_image, (500, 500)) | |
cell_padded = cv2.copyMakeBorder( | |
cell_resized, 10, 10, 10, 10, cv2.BORDER_CONSTANT, value=255) | |
cell_rgb = cv2.cvtColor(cell_padded, cv2.COLOR_GRAY2RGB) | |
# Use EasyOCR to detect text | |
result = reader.readtext(cell_rgb, detail=0, allowlist='0123456789') | |
if result: | |
digit = max(result, key=len) | |
if digit.isdigit(): | |
return int(digit) | |
# If no digit is detected, count connected black pixel components (dots) | |
logger.info("No digit detected. Counting black pixel groups.") | |
# Save Image for inspection | |
dots_count = count_connected_black_pixels(cell_padded) | |
logger.info(f"Detected {dots_count} connected black pixel groups (dots).") | |
return dots_count if dots_count > 0 else 0 # Default to 0 if nothing is recognized | |
def is_valid_general(board, row, col, num, size): | |
# Check row and column | |
for i in range(size): | |
if board[row][i] == num or board[i][col] == num: | |
return False | |
# Check subgrid | |
subgrid_size = int(np.sqrt(size)) | |
start_row, start_col = subgrid_size * (row // subgrid_size), subgrid_size * (col // subgrid_size) | |
for y in range(start_row, start_row + subgrid_size): | |
for x in range(start_col, start_col + subgrid_size): | |
if board[y][x] == num: | |
return False | |
return True | |
def solve_sudoku_general(board, size): | |
for row in range(size): | |
for col in range(size): | |
if board[row][col] == 0: | |
for num in range(1, size + 1): | |
if is_valid_general(board, row, col, num, size): | |
board[row][col] = num | |
if solve_sudoku_general(board, size): | |
return True | |
board[row][col] = 0 | |
return False | |
return True | |
def calculate_sum(board, empty_cells): | |
total = 0 | |
for cell in empty_cells: | |
x = cell.get('x') | |
y = cell.get('y') | |
# Validate the coordinates | |
total += board[y][x] | |
return total | |
def sudoku_endpoint(): | |
payload = request.json | |
logger.info("Received payload: %s", payload) | |
# Extract fields | |
sudoku_id = payload.get('id') | |
encoded_str = payload.get('encoded') | |
img_length = payload.get('imgLength') | |
empty_cells = payload.get('emptyCells') | |
# Base64 Decode First | |
run_length_encoded_bytes = base64.b64decode(encoded_str) | |
logger.info("Base64 decoding successful.") | |
# Run-Length Decode Second | |
image_bytes = run_length_decode(run_length_encoded_bytes) | |
logger.info("Run-length decoding successful.") | |
# Open image using PIL | |
image = Image.open(io.BytesIO(image_bytes)) | |
# Save Image for inspection | |
# image.save("image_main.jpg") | |
logger.info("Image opened successfully.") | |
# Extract Sudoku board | |
board, size = extract_sudoku_board(image, empty_cells, img_length) | |
logger.info(f"Sudoku board extracted successfully. Size: {size}x{size}") | |
# Solve Sudoku | |
logger.info("Sudoku board before solving:") | |
for row in board: | |
logger.info(row) | |
solved = solve_sudoku_general(board, size) | |
logger.info("Sudoku solved successfully.") | |
total_sum = calculate_sum(board, empty_cells) | |
logger.info(f"Sum of specified cells: {total_sum}") | |
# Prepare response | |
response = { | |
"answer": board, | |
"sum": total_sum | |
} | |
logger.info(f"Response prepared successfully for Sudoku ID: {sudoku_id}") | |
return jsonify(response) |