Working on cleaning

This commit is contained in:
Thomas Forgione 2024-08-23 15:17:09 +02:00
parent 4889aeaf7c
commit 84b5703f64
13 changed files with 814 additions and 766 deletions

View File

@ -1,9 +1,7 @@
from flask import Flask, redirect, request, render_template, send_from_directory, session
import json
from flask import Flask, redirect, render_template, send_from_directory, session
import os
from os.path import join
import sqlite3
from . import db, config, scanner, calibration, archive
from . import db, config, scanner, routes, utils
app = Flask(__name__)
@ -19,22 +17,25 @@ except ImportError:
app.config['SECRET_KEY'] = secret
def get_calibration(conn: sqlite3.Connection) -> db.Calibration:
calibration_id = session.get('calibration_id', None)
if calibration_id is None:
return db.Calibration.Dummy
return db.Calibration.get_from_id(calibration_id, conn)
# Middlewares to help us deal with stuff
@app.context_processor
def inject_stage_and_region():
def inject():
"""
Returns a dictionnary with the uuids of leds and the calibration state.
"""
conn = db.get()
return dict(calibration=get_calibration(conn), leds=config.LEDS_UUIDS, CalibrationState=db.CalibrationState)
return {
'calibration': utils.get_calibration(conn),
'leds': config.LEDS_UUIDS,
'CalibrationState': db.CalibrationState,
}
@app.before_request
def manage_auto_use_last_calibration():
"""
Automatically use the last calibration if the config is set accordingly.
"""
if config.AUTO_USE_LAST_CALIBRATION and 'calibration_id' not in session:
conn = db.get()
last = db.Calibration.get_last(conn)
@ -42,34 +43,7 @@ def manage_auto_use_last_calibration():
session['calibration_id'] = last.id
@app.route("/")
def index():
conn = db.get()
projects = db.Object.all_by_project(conn)
return render_template('index.html', projects=projects)
@app.route("/create-object/", methods=["POST"])
def create_object():
conn = db.get()
with conn:
db.Object.create(request.form.get('name'), request.form.get('project'), conn)
return redirect('/')
@app.route('/object/<id>')
def object(id: int):
conn = db.get()
object = db.Object.get_from_id(id, conn).full(conn)
return render_template('object.html', object=object)
@app.route('/delete-object/<id>')
def delete_object(id: int):
conn = db.get()
with conn:
db.Object.delete_from_id(id, conn)
return redirect('/')
app.register_blueprint(routes.blueprint)
@app.route('/scan/<id>')
@ -93,67 +67,6 @@ def scan_existing(id: int):
return render_template('scan.html', object=object, acquisition=acquisition, calibrated=calibrated)
@app.route("/calibrate/")
def calibrate():
conn = db.get()
if 'calibration_id' not in session:
with conn:
calibration = db.Calibration.create(conn)
session['calibration_id'] = calibration.id
else:
calibration = db.Calibration.get_from_id(session['calibration_id'], conn)
if calibration.state in [db.CalibrationState.Empty, db.CalibrationState.HasData]:
return render_template('calibrate.html')
else:
return render_template('calibration.html', calibration=calibration)
@app.route("/new-calibration")
def new_calibration():
conn = db.get()
with conn:
calibration = db.Calibration.create(conn)
session['calibration_id'] = calibration.id
return redirect('/calibrate')
@app.route("/cancel-calibration")
def cancel_calibration():
conn = db.get()
calibration = db.Calibration.get_from_id(session['calibration_id'], conn)
calibration.state = db.CalibrationState.HasData
with conn:
calibration.save(conn)
return redirect('/calibrate')
@app.route("/api/scan-for-calibration")
def scan_calibration():
conn = db.get()
if 'calibration_id' not in session:
with conn:
calibration = db.Calibration.create(conn)
calibration_id = str(calibration.id)
session['calibration_id'] = calibration.id
else:
calibration_id = str(session['calibration_id'])
calibration = get_calibration(conn)
def generate():
length = len(config.LEDS_UUIDS)
for index, led_uuid in enumerate(scanner.scan(join(config.CALIBRATION_DIR, calibration_id))):
yield f"{led_uuid},{(index+1)/length}\n"
with conn:
calibration.state = db.CalibrationState.HasData
calibration.save(conn)
return app.response_class(generate(), mimetype='text/plain')
@app.route("/api/scan-for-object/<object_id>")
def scan_object(object_id: int):
conn = db.get()
@ -227,53 +140,6 @@ def delete_acquisition(acquisition_id: int):
return redirect('/object/' + str(acqusition.object_id))
@app.route('/use-last-calibration')
def use_last_calibration():
conn = db.get()
calibration = db.Calibration.get_last(conn)
session['calibration_id'] = calibration.id
return redirect('/calibrate')
@app.route('/api/use-last-calibration')
def api_use_last_calibration():
conn = db.get()
calibration = db.Calibration.get_last(conn)
session['calibration_id'] = calibration.id
return 'ok'
@app.route("/api/calibrate")
def run_calibration():
conn = db.get()
id = session['calibration_id']
calib = db.Calibration.get_from_id(id, conn)
if calib is None:
return 'oops', 404
calibration_json = calibration.calibrate(join(config.CALIBRATION_DIR, str(id)))
with open(join(config.CALIBRATION_DIR, str(id), 'calibration.json'), 'w') as f:
json.dump(calibration_json, f, indent=4)
with conn:
calib.state = db.CalibrationState.IsComputed
calib.save(conn)
return 'ok'
@app.route('/validate-calibration')
def validate_calibration():
conn = db.get()
calib = get_calibration(conn)
if calib is None:
return 'oops', 404
with conn:
calib.validate(conn)
return redirect('/')
@app.route('/static/<path:path>')
def send_static(path):
return send_from_directory('static', path)
@ -282,6 +148,3 @@ def send_static(path):
@app.route('/data/<path:path>')
def send_data(path):
return send_from_directory('data', path)
app.register_blueprint(archive.blueprint)

View File

@ -1,14 +1,11 @@
import builtins
from datetime import datetime
from flask import Response, Blueprint
from flask import Response
import functools
import itertools
import os
from os.path import join
import zlib
from typing import Optional
import time
from . import db, config
# Chunks for crc 32 computation
CRC32_CHUNK_SIZE = 65_536
@ -417,63 +414,3 @@ class ZipSender(ArchiveSender):
def archive_name(self) -> str:
return 'archive.zip'
def download_object(id: int, archive: ArchiveSender):
"""
Helper for routes that send archives.
"""
conn = db.get()
object = db.Object.get_from_id(id, conn).full(conn)
# Group acquisitions sharing calibration
def keyfunc(x: db.Calibration) -> int:
return x.calibration_id
acquisitions_sorted = sorted(object.acquisitions, key=keyfunc)
acquisitions_grouped = [
(db.Calibration.get_from_id(k, conn), list(g))
for k, g in itertools.groupby(acquisitions_sorted, key=keyfunc)
]
# Create archive file to send
for calibration_index, (calib, acquisitions) in enumerate(acquisitions_grouped):
calibration_dir = join(config.CALIBRATION_DIR, str(calib.id))
# Add calibration images
for image in os.listdir(calibration_dir):
archive.add_file(
f'object/{calibration_index}/calibration/{image}',
join(calibration_dir, image)
)
# Add each acquisition
for acquisition_index, acquisition in enumerate(acquisitions):
acquisition_dir = join(config.OBJECT_DIR, str(object.id), str(acquisition.id))
for image in os.listdir(acquisition_dir):
archive.add_file(
f'object/{calibration_index}/{acquisition_index}/{image}',
join(acquisition_dir, image)
)
return archive.response()
blueprint = Blueprint('archive', __name__)
@blueprint.route('/download-object/tar/<id>')
def download_object_tar(id: int):
"""
Downloads an object as a tar archive.
"""
return download_object(id, TarSender())
@blueprint.route('/download-object/zip/<id>')
def download_object_zip(id: int):
"""
Downloads an object as a zip archive.
"""
return download_object(id, ZipSender())

View File

@ -7,7 +7,7 @@ import os
import sys
from PIL import Image
from . import utils
from . import math_utils
# To extract a few images and resize them at 20% of their size:
@ -29,7 +29,7 @@ def calibrate(input_dir: str):
focal_mm = 35
matrix_size = 24
focal_pix = nu * focal_mm / matrix_size
K = utils.build_K_matrix(focal_pix, nu/2, nv/2)
K = math_utils.build_K_matrix(focal_pix, nu/2, nv/2)
# Max image: image of brightest pixels, helps spheres segmentation
max_image = functools.reduce(np.maximum, images)
@ -41,19 +41,19 @@ def calibrate(input_dir: str):
init_params = np.ones(2), np.broadcast_to(np.eye(3) * 0.1, (2, 3, 3)), np.asarray([[0, 0, 0], [1, 1, 1]])
# Estimate GMM parameters and classify pixels
estimated_params = utils.gaussian_mixture_estimation(pixels, init_params, it=10)
classif = np.asarray(utils.maximum_likelihood(pixels, estimated_params), dtype=bool)
estimated_params = math_utils.gaussian_mixture_estimation(pixels, init_params, it=10)
classif = np.asarray(math_utils.maximum_likelihood(pixels, estimated_params), dtype=bool)
# Refine classification to select the appropriate binary mask
rectified_classif = utils.select_binary_mask(classif, lambda mask: np.mean(pixels[mask]))
rectified_classif = math_utils.select_binary_mask(classif, lambda mask: np.mean(pixels[mask]))
# Identify the largest connected components (spheres) and extract their borders
sphere_masks = utils.get_greatest_components(np.reshape(rectified_classif, (nu, nv)), nspheres)
border_masks = np.vectorize(utils.get_mask_border, signature='(u,v)->(u,v)')(sphere_masks)
sphere_masks = math_utils.get_greatest_components(np.reshape(rectified_classif, (nu, nv)), nspheres)
border_masks = np.vectorize(math_utils.get_mask_border, signature='(u,v)->(u,v)')(sphere_masks)
# Fit quadratic forms (ellipses) to the borders
def fit_on_mask(border):
return utils.fit_quadratic_form(utils.to_homogeneous(np.argwhere(border)))
return math_utils.fit_quadratic_form(math_utils.to_homogeneous(np.argwhere(border)))
ellipse_quadratics = np.vectorize(fit_on_mask, signature='(u,v)->(t,t)')(border_masks)
@ -61,18 +61,18 @@ def calibrate(input_dir: str):
calibrated_quadratics = np.swapaxes(K, -1, -2) @ ellipse_quadratics @ K
# Deproject the ellipse quadratics to sphere centers
sphere_centers = utils.deproject_ellipse_to_sphere(calibrated_quadratics, 1)
sphere_centers = math_utils.deproject_ellipse_to_sphere(calibrated_quadratics, 1)
# Create coordinates and calculate camera rays
coordinates = np.stack(np.meshgrid(range(nu), range(nv), indexing='ij'), axis=-1)
rays = utils.get_camera_rays(coordinates, K)
rays = math_utils.get_camera_rays(coordinates, K)
# Find the intersections between the camera rays and the spheres
sphere_points_map, sphere_geometric_masks = \
utils.line_sphere_intersection(sphere_centers[:, np.newaxis, np.newaxis, :], 1, rays[np.newaxis, :, :, :])
math_utils.line_sphere_intersection(sphere_centers[:, np.newaxis, np.newaxis, :], 1, rays[np.newaxis, :, :, :])
sphere_points = np.asarray([sphere_points_map[i, sphere_geometric_masks[i]] for i in range(nspheres)], dtype=object)
sphere_normals = np.vectorize(utils.sphere_intersection_normal, signature='(v),()->()', otypes=[object])(sphere_centers, sphere_points)
sphere_normals = np.vectorize(math_utils.sphere_intersection_normal, signature='(v),()->()', otypes=[object])(sphere_centers, sphere_points)
# Load grey values from images for the identified sphere regions
def to_grayscale(image):
@ -82,17 +82,17 @@ def calibrate(input_dir: str):
# Estimate lighting conditions from sphere normals and grey values
estimated_lights = np.vectorize(
utils.estimate_light,
math_utils.estimate_light,
excluded=(2,),
signature='(),()->(k)',
otypes=[float]
)(sphere_normals, grey_values, (0.1, 0.9))
# Calculate the positions of the light sources
light_positions = utils.lines_intersections(sphere_centers, estimated_lights)
light_positions = math_utils.lines_intersections(sphere_centers, estimated_lights)
# Calculate plane parameters from the sphere centers and intersect camera rays with the plane
plane_normal, plane_alpha = utils.plane_parameters_from_points(sphere_centers)
plane_normal, plane_alpha = math_utils.plane_parameters_from_points(sphere_centers)
# Return value as dictionnary
return {

524
math_utils.py Normal file
View File

@ -0,0 +1,524 @@
import numpy as np
import scipy.ndimage as ndimage
def dot_product(v1, v2):
"""Computes the dot product between two arrays of vectors.
Args:
v1 (Array ..., ndim): First array of vectors.
v2 (Array ..., ndim): Second array of vectors.
Returns:
Array ...: Dot product between v1 and v2.
"""
result = np.einsum('...i,...i->...', v1, v2)
return result
def norm_vector(v):
"""computes the norm and direction of vectors
Args:
v (Array ..., dim): vectors to compute the norm and direction for
Returns:
Array ...: norms of the vectors
Array ..., dim: unit direction vectors
"""
norm = np.linalg.norm(v, axis=-1)
direction = v/norm[..., np.newaxis]
return norm, direction
def to_homogeneous(v):
"""converts vectors to homogeneous coordinates
Args:
v (Array ..., dim): input vectors
Returns:
Array ..., dim+1: homogeneous coordinates of the input vectors
"""
append_term = np.ones(np.shape(v)[:-1] + (1,))
homogeneous = np.append(v, append_term, axis=-1)
return homogeneous
def cross_to_skew_matrix(v):
"""converts a vector cross product to a skew-symmetric matrix multiplication
Args:
v (Array ..., 3): vectors to convert
Returns:
Array ..., 3, 3: matrices corresponding to the input vectors
"""
indices = np.asarray([[-1, 2, 1], [2, -1, 0], [1, 0, -1]])
signs = np.asarray([[0, -1, 1], [1, 0, -1], [-1, 1, 0]])
skew_matrix = v[..., indices] * signs
return skew_matrix
def build_K_matrix(focal_length, u0, v0):
"""
Build the camera intrinsic matrix.
Parameters:
focal_length (float): Focal length of the camera.
u0 (float): First coordinate of the principal point.
v0 (float): Seccond coordinate of the principal point.
Returns:
numpy.ndarray: Camera intrinsic matrix (3x3).
"""
K = np.asarray([[focal_length, 0, u0],
[0, focal_length, v0],
[0, 0, 1]])
return K
def get_camera_rays(points, K):
"""Computes the camera rays for a set of points given the camera matrix K.
Args:
points (Array ..., 2): Points in the image plane.
K (Array 3, 3): Camera intrinsic matrix.
Returns:
Array ..., 3: Camera rays corresponding to the input points.
"""
homogeneous = to_homogeneous(points)
inv_K = np.linalg.inv(K)
rays = np.einsum('ij,...j->...i', inv_K, homogeneous)
return rays
def matrix_kernel(A):
"""Computes the eigenvector corresponding to the smallest eigenvalue of the matrix A.
Args:
A (Array ..., n, n): Input square matrix.
Returns:
Array ..., n: Eigenvector corresponding to the smallest eigenvalue.
"""
eigval, eigvec = np.linalg.eig(A)
min_index = np.argmin(np.abs(eigval), axis=-1)
min_eigvec = np.take_along_axis(eigvec, min_index[..., None, None], -1)[..., 0]
normed_eigvec = norm_vector(min_eigvec)[1]
return normed_eigvec
def evaluate_bilinear_form(Q, left, right):
"""evaluates bilinear forms at several points
Args:
Q (Array ...,ldim,rdim): bilinear form to evaluate
left (Array ...,ldim): points where the bilinear form is evaluated to the left
right (Array ...,rdim): points where the bilinear form is evaluated to the right
Returns:
Array ... bilinear forms evaluated
"""
result = np.einsum('...ij,...i,...j->...', Q, left, right)
return result
def evaluate_quadratic_form(Q, points):
"""evaluates quadratic forms at several points
Args:
Q (Array ...,dim,dim): quadratic form to evaluate
points (Array ...,dim): points where the quadratic form is evaluated
Returns:
Array ... quadratic forms evaluated
"""
result = evaluate_bilinear_form(Q, points, points)
return result
def merge_quadratic_to_homogeneous(Q, b, c):
"""merges quadratic form, linear term, and constant term into a homogeneous matrix
Args:
Q (Array ..., dim, dim): quadratic form matrix
b (Array ..., dim): linear term vector
c (Array ...): constant term
Returns:
Array ..., dim+1, dim+1: homogeneous matrix representing the quadratic form
"""
dim_points = Q.shape[-1]
stack_shape = np.broadcast_shapes(np.shape(Q)[:-2], np.shape(b)[:-1], np.shape(c))
Q_b = np.broadcast_to(Q, stack_shape + (dim_points, dim_points))
b_b = np.broadcast_to(np.expand_dims(b, -1), stack_shape+(dim_points, 1))
c_b = np.broadcast_to(np.expand_dims(c, (-1, -2)), stack_shape + (1, 1))
H = np.block([[Q_b, 0.5 * b_b], [0.5 * np.swapaxes(b_b, -1, -2), c_b]])
return H
def quadratic_to_dot_product(points):
"""computes the matrix W such that
x.T@Ax = W(x).T*A[ui,uj]
Args:
points ( Array ...,ndim): points of dimension ndim
Returns:
Array ...,ni: dot product matrix (W)
Array ni: i indices of central matrix
Array ni: j indices of central matrix
"""
dim_points = points.shape[-1]
ui, uj = np.triu_indices(dim_points)
W = points[..., ui] * points[..., uj]
return W, ui, uj
def fit_quadratic_form(points):
"""Fits a quadratic form to the given zeroes.
Args:
points (Array ..., n, dim): Input points.
Returns:
Array ..., dim, dim: Fitted quadratic form matrix.
"""
dim_points = points.shape[-1]
normed_points = norm_vector(points)[1]
W, ui, uj = quadratic_to_dot_product(normed_points)
H = np.einsum('...ki,...kj->...ij', W, W)
V0 = matrix_kernel(H)
Q = np.zeros(V0.shape[:-1] + (dim_points, dim_points))
Q[..., ui, uj] = V0
return Q
def gaussian_pdf(mu, sigma, x):
"""Computes the PDF of a multivariate Gaussian distribution.
Args:
mu (Array ...,k): Mean vector.
sigma (Array ...,k,k): Covariance matrix.
x (Array ...,k): Input vector.
Returns:
Array ...: Value of the PDF.
"""
k = np.shape(x)[-1]
Q = np.linalg.inv(sigma)
normalization = np.reciprocal(np.sqrt(np.linalg.det(sigma) * np.power(2.0 * np.pi, k)))
quadratic = evaluate_quadratic_form(Q, x - mu)
result = np.exp(-0.5 * quadratic) * normalization
return result
def gaussian_estimation(x, weights):
"""Estimates the mean and covariance matrix of a Gaussian distribution.
Args:
x (Array ...,n,dim): Data points.
weights (Array ...,n): Weights for each data point.
Returns:
Array ...,dim: Estimated mean vector.
Array ...,dim,dim: Estimated covariance matrix.
"""
weights_sum = np.sum(weights, axis=-1)
mu = np.sum(x*np.expand_dims(weights, axis=-1), axis=-2) / np.expand_dims(weights_sum, axis=-1)
centered_x = x - np.expand_dims(mu, axis=-2)
sigma = np.einsum('...s, ...si, ...sj->...ij', weights, centered_x, centered_x)/np.expand_dims(weights_sum, axis=(-1, -2))
return mu, sigma
def gaussian_mixture_estimation(x, init_params, it=100):
"""Estimates the parameters of a k Gaussian mixture model using the EM algorithm.
Args:
x (Array ..., n, dim): Data points.
init_params (tuple): Initial parameters (pi, sigma, mu).
pi (Array ..., k): Initial mixture weights.
sigma (Array ..., k, dim, dim): Initial covariance matrices.
mu (Array ..., k, dim): Initial means.
it (int, optional): Number of iterations. Defaults to 100.
Returns:
Tuple[(Array ..., k), (Array ..., k, dim, dim), (Array ..., k, dim)]:
Estimated mixture weights,covariance matrices, means.
"""
pi, sigma, mu = init_params
for _ in range(it):
pdf = gaussian_pdf(
np.expand_dims(mu, axis=-2),
np.expand_dims(sigma, axis=-3),
np.expand_dims(x, axis=-3)
) * np.expand_dims(pi, axis=-1)
weights = pdf/np.sum(pdf, axis=-2, keepdims=True)
pi = np.mean(weights, axis=-1)
mu, sigma = gaussian_estimation(x, weights)
return pi, sigma, mu
def maximum_likelihood(x, params):
"""Selects the best gaussian model for a point
Args:
x (Array ..., dim): Data points.
params (tuple): Gaussians parameters (pi, sigma, mu).
pi (Array ..., k): Mixture weights.
sigma (Array ..., k, dim, dim): Covariance matrices.
mu (Array ..., k, dim): Means.
Returns:
Array ...: integer in [0,k-1] giving the maximum likelihood model
"""
pi, sigma, mu = params
pdf = gaussian_pdf(mu, sigma, np.expand_dims(x, axis=-2))*pi
result = np.argmax(pdf, axis=-1)
return result
def get_greatest_components(mask, n):
"""
Extract the n largest connected components from a binary mask.
Parameters:
mask (Array ...): The binary mask.
n (int): The number of largest connected components to extract.
Returns:
Array n,...: A boolean array of the n largest connected components
"""
labeled, _ = ndimage.label(mask)
unique, counts = np.unique(labeled, return_counts=True)
greatest_labels = unique[unique != 0][np.argsort(counts[unique != 0])[-n:]]
greatest_components = labeled[np.newaxis, ...] == np.expand_dims(greatest_labels, axis=tuple(range(1, 1 + mask.ndim)))
return greatest_components
def get_mask_border(mask):
"""
Extract the border from a binary mask.
Parameters:
mask (Array ...): The binary mask.
Returns:
Array ...: A boolean array mask of the border
"""
inverted_mask = np.logical_not(mask)
dilated = ndimage.binary_dilation(inverted_mask)
border = np.logical_and(mask, dilated)
return border
def select_binary_mask(mask, metric):
"""Selects the side of a binary mask that optimizes the given metric.
Args:
mask (Array bool ...): Initial binary mask.
metric (function): Function to evaluate the quality of the mask.
Returns:
Array bool ...: Selected binary mask that maximizes the metric.
"""
inverted = np.logical_not(mask)
result = mask if metric(mask) > metric(inverted) else inverted
return result
def deproject_ellipse_to_sphere(M, radius):
"""finds the deprojection of an ellipse to a sphere
Args:
M (Array 3,3): Ellipse quadratic form
radius (float): radius of the researched sphere
Returns:
Array 3: solution of sphere centre location
"""
H = 0.5 * (np.swapaxes(M, -1, -2) + M)
eigval, eigvec = np.linalg.eigh(H)
i_unique = np.argmax(np.abs(np.median(eigval, axis=-1, keepdims=True) - eigval), axis=-1)
unique_eigval = np.take_along_axis(eigval, i_unique[..., None], -1)[..., 0]
unique_eigvec = np.take_along_axis(eigvec, i_unique[..., None, None], -1)[..., 0]
double_eigval = 0.5 * (np.sum(eigval, axis=-1) - unique_eigval)
z_sign = np.sign(unique_eigvec[..., -1])
dist = np.sqrt(1 - double_eigval / unique_eigval)
C = np.real(radius * (dist * z_sign)[..., None] * norm_vector(unique_eigvec)[1])
return C
def weighted_least_squares(A, y, weights):
"""Computes the weighted least squares solution of Ax=y.
Args:
A (Array ...,u,v): Design matrix.
y (Array ...,u): Target values.
weights (Array ...,u): Weights for each equation.
Returns:
Array ...,v : Weighted least squares solution.
"""
pinv = np.linalg.pinv(A * weights[..., np.newaxis])
result = np.einsum('...uv,...v->...u', pinv, y * weights)
return result
def iteratively_reweighted_least_squares(A, y, epsilon=1e-5, it=20):
"""Computes the iteratively reweighted least squares solution. of Ax=y
Args:
A (Array ..., u, v): Design matrix.
y (Array ..., u): Target values.
epsilon (float, optional): Small value to avoid division by zero. Defaults to 1e-5.
it (int, optional): Number of iterations. Defaults to 20.
Returns:
Array ..., v: Iteratively reweighted least squares solution.
"""
weights = np.ones(y.shape)
for _ in range(it):
result = weighted_least_squares(A, y, weights)
ychap = np.einsum('...uv, ...v->...u', A, result)
delta = np.abs(ychap-y)
weights = np.reciprocal(np.maximum(epsilon, np.sqrt(delta)))
return result
def lines_intersections_system(points, directions):
"""computes the system of equations for intersections of lines, Ax=b
where x is the instersection
Args:
points (Array ..., npoints, ndim): points through which the lines pass
directions (Array ..., npoints, ndim): direction vectors of the lines
Returns:
Array ..., 3*npoints, ndim: coefficient matrix A for the system of equations
Array ..., 3*npoints: right-hand side vector b for the system of equations
"""
n = norm_vector(directions)[1]
skew = np.swapaxes(cross_to_skew_matrix(n), -1, -2)
root = np.einsum('...uij, ...uj->...ui', skew, points)
A = np.concatenate(np.moveaxis(skew, -3, 0), axis=-2)
b = np.concatenate(np.moveaxis(root, -2, 0), axis=-1)
return A, b
def lines_intersections(points, directions):
"""computes the intersections of lines
Args:
points (Array ..., npoints, ndim): points through which the lines pass
directions (Array ..., npoints, ndim): direction vectors of the lines
Returns:
Array ..., ndim: intersection
"""
A, b = lines_intersections_system(points, directions)
x = iteratively_reweighted_least_squares(A, b)
return x
def line_sphere_intersection_determinant(center, radius, directions):
"""computes the determinant for the intersection of a line and a sphere,
Args:
center (Array ..., dim): center of the sphere
radius (Array ...): radius of the sphere
directions (Array ..., dim): direction of the line
Returns:
Array ...:intersection determinant
"""
directions_norm_2 = np.square(norm_vector(directions)[0])
center_norm_2 = np.square(norm_vector(center)[0])
dot_product_2 = np.square(dot_product(center, directions))
delta = dot_product_2 - directions_norm_2 * (center_norm_2 - np.square(radius))
return delta
def line_plane_intersection(normal, alpha, directions):
"""Computes the intersection points between a line and a plane.
Args:
normal (Array ..., ndim): Normal vector to the plane.
alpha (Array ...): Plane constant alpha.
directions (Array ..., dim): direction of the line
Returns:
Array ..., ndim: Intersection points between the line and the sphere.
"""
t = -alpha*np.reciprocal(dot_product(directions, normal))
intersection = directions*t[..., np.newaxis]
return intersection
def line_sphere_intersection(center, radius, directions):
"""Computes the intersection points between a line and a sphere.
Args:
center (Array ..., ndim): Center of the sphere.
radius (Array ...): Radius of the sphere.
directions (Array ..., ndim): Direction vectors of the line.
Returns:
Array ..., ndim: Intersection points between the line and the sphere.
Array bool ...: Mask of intersection points
"""
delta = line_sphere_intersection_determinant(center, radius, directions)
mask = delta > 0
directions_norm_2 = np.square(norm_vector(directions)[0])
distances = (dot_product(center, directions) - np.sqrt(np.maximum(0, delta))) * np.reciprocal(directions_norm_2)
intersection = np.expand_dims(distances, axis=-1) * directions
return intersection, mask
def sphere_intersection_normal(center, point):
"""Computes the normal vector at the intersection point on a sphere.
Args:
center (Array ..., dim): Coordinates of the sphere center.
point (Array ..., dim): Coordinates of the intersection point.
Returns:
Array ..., dim: Normal normal vector at the intersection point.
"""
vector = point - center
normal = norm_vector(vector)[1]
return normal
def estimate_light(normals, grey_levels, treshold=(0, 1)):
"""Estimates the light directions using the given normals, grey levels, and mask.
Args:
normals (Array ..., n, dim): Normal vectors.
grey_levels (Array ..., n): Grey levels corresponding to the normals.
threshold (tuple, optional): Intensity threshold for valid grey levels. Defaults to (0, 1).
Returns:
Array ..., dim: Estimated light directions.
"""
validity_mask = np.logical_and(grey_levels > treshold[0], grey_levels < treshold[1])
lights = weighted_least_squares(normals, grey_levels, validity_mask)
return lights
def plane_parameters_from_points(points):
"""Computes the parameters of a plane from a set of points.
Args:
points (Array ..., dim): Coordinates of the points used to define the plane.
Returns:
Array ..., dim: Normal vector to the plane.
Array ...: Plane constant alpha.
"""
homogeneous = to_homogeneous(points)
E = np.einsum('...ki,...kj->...ij', homogeneous, homogeneous)
L = matrix_kernel(E)
n, alpha = L[..., :-1], L[..., -1]
return n, alpha

21
routes/__init__.py Normal file
View File

@ -0,0 +1,21 @@
from flask import Blueprint, render_template
from .. import db
from . import object, calibration
blueprint = Blueprint('routes', __name__)
# Generic routes
@blueprint.route("/")
def index():
"""
Serves the index of nenuscanner.
"""
conn = db.get()
projects = db.Object.all_by_project(conn)
return render_template('index.html', projects=projects)
blueprint.register_blueprint(object.blueprint, url_prefix='/object')
blueprint.register_blueprint(calibration.blueprint, url_prefix='/calibration')

113
routes/calibration.py Normal file
View File

@ -0,0 +1,113 @@
from flask import Blueprint, Response, render_template, redirect, session
from os.path import join
import json
from .. import db, utils, scanner, config, calibration
blueprint = Blueprint('calibration', __name__)
@blueprint.route("/create")
def create():
"""
Creates a new calibration and redirects to the page to calibrate.
"""
conn = db.get()
with conn:
calibration = db.Calibration.create(conn)
session['calibration_id'] = calibration.id
return redirect('/calibration/calibrate')
@blueprint.route("/calibrate")
def calibrate():
"""
Returns the page to calibrate the system.
"""
conn = db.get()
if 'calibration_id' not in session:
with conn:
calibration = db.Calibration.create(conn)
session['calibration_id'] = calibration.id
else:
calibration = db.Calibration.get_from_id(session['calibration_id'], conn)
if calibration.state in [db.CalibrationState.Empty, db.CalibrationState.HasData]:
return render_template('calibrate.html')
else:
return render_template('calibration.html', calibration=calibration)
@blueprint.route('/scan')
def scan():
conn = db.get()
if 'calibration_id' not in session:
with conn:
calibration = db.Calibration.create(conn)
calibration_id = str(calibration.id)
session['calibration_id'] = calibration.id
else:
calibration_id = str(session['calibration_id'])
calibration = utils.get_calibration(conn)
def generate():
length = len(config.LEDS_UUIDS)
for index, led_uuid in enumerate(scanner.scan(join(config.CALIBRATION_DIR, calibration_id))):
yield f"{led_uuid},{(index+1)/length}\n"
with conn:
calibration.state = db.CalibrationState.HasData
calibration.save(conn)
return Response(generate(), mimetype='text/plain')
@blueprint.route('/compute')
def compute():
conn = db.get()
id = session['calibration_id']
calib = db.Calibration.get_from_id(id, conn)
if calib is None:
return 'oops', 404
calibration_json = calibration.calibrate(join(config.CALIBRATION_DIR, str(id)))
with open(join(config.CALIBRATION_DIR, str(id), 'calibration.json'), 'w') as f:
json.dump(calibration_json, f, indent=4)
with conn:
calib.state = db.CalibrationState.IsComputed
calib.save(conn)
return 'ok'
@blueprint.route('/cancel')
def cancel():
conn = db.get()
calibration = db.Calibration.get_from_id(session['calibration_id'], conn)
calibration.state = db.CalibrationState.HasData
with conn:
calibration.save(conn)
return redirect('/calibrate')
@blueprint.route('/validate')
def validate():
conn = db.get()
calib = utils.get_calibration(conn)
if calib is None:
return 'oops', 404
with conn:
calib.validate(conn)
return redirect('/')
@blueprint.route('/use-last')
def use_last():
conn = db.get()
calib = db.Calibration.get_last(conn)
session['calibration_id'] = calib.id
return redirect('/calibrate')

98
routes/object.py Normal file
View File

@ -0,0 +1,98 @@
from flask import Blueprint, render_template, redirect, request
import os
from os.path import join
import itertools
from .. import db, config, archive
blueprint = Blueprint('routes', __name__)
# Routes for object management
@blueprint.route('/<id>')
def get(id: int):
"""
Returns the page showing an object.
"""
conn = db.get()
object = db.Object.get_from_id(id, conn).full(conn)
return render_template('object.html', object=object)
@blueprint.route('/create', methods=['POST'])
def create():
"""
Creates a new object.
"""
conn = db.get()
with conn:
db.Object.create(request.form.get('name'), request.form.get('project'), conn)
return redirect('/')
@blueprint.route('/delete/<id>')
def delete(id: int):
"""
Deletes an object from its id.
"""
conn = db.get()
with conn:
db.Object.delete_from_id(id, conn)
return redirect('/')
def download_object(id: int, archive: archive.ArchiveSender):
"""
Helper for routes that send archives.
"""
conn = db.get()
object = db.Object.get_from_id(id, conn).full(conn)
# Group acquisitions sharing calibration
def keyfunc(x: db.Calibration) -> int:
return x.calibration_id
acquisitions_sorted = sorted(object.acquisitions, key=keyfunc)
acquisitions_grouped = [
(db.Calibration.get_from_id(k, conn), list(g))
for k, g in itertools.groupby(acquisitions_sorted, key=keyfunc)
]
# Create archive file to send
for calibration_index, (calib, acquisitions) in enumerate(acquisitions_grouped):
calibration_dir = join(config.CALIBRATION_DIR, str(calib.id))
# Add calibration images
for image in os.listdir(calibration_dir):
archive.add_file(
f'object/{calibration_index}/calibration/{image}',
join(calibration_dir, image)
)
# Add each acquisition
for acquisition_index, acquisition in enumerate(acquisitions):
acquisition_dir = join(config.OBJECT_DIR, str(object.id), str(acquisition.id))
for image in os.listdir(acquisition_dir):
archive.add_file(
f'object/{calibration_index}/{acquisition_index}/{image}',
join(acquisition_dir, image)
)
return archive.response()
@blueprint.route('/download/tar/<id>')
def download_object_tar(id: int):
"""
Downloads an object as a tar archive.
"""
return download_object(id, archive.TarSender())
@blueprint.route('/download/zip/<id>')
def download_object_zip(id: int):
"""
Downloads an object as a zip archive.
"""
return download_object(id, archive.ZipSender())

View File

@ -22,25 +22,25 @@
</div>
<div id="navbarBasicExample" class="navbar-menu">
<div class="navbar-end">
<a id="calibration-tag-0" class="calibration-tag navbar-item" href="/calibrate/" {% if calibration.state != 0 %}style="display: none;"{% endif %}>
<a id="calibration-tag-0" class="calibration-tag navbar-item" href="/calibration/calibrate" {% if calibration.state != 0 %}style="display: none;"{% endif %}>
<span id="calibration-tag-0" class="tags has-addons">
<span class="tag is-dark">étalonnage</span>
<span class="tag is-danger">aucune donnée</span>
</span>
</a>
<a id="calibration-tag-1" class="calibration-tag navbar-item" href="/calibrate/" {% if calibration.state != 1 %}style="display: none;"{% endif %}>
<a id="calibration-tag-1" class="calibration-tag navbar-item" href="/calibration/calibrate" {% if calibration.state != 1 %}style="display: none;"{% endif %}>
<span class="tags has-addons" >
<span class="tag is-dark">étalonnage</span>
<span class="tag is-warning">non calculé</span>
</span>
</a>
<a id="calibration-tag-2" class="calibration-tag navbar-item" href="/calibrate/" {% if calibration.state != 2 %}style="display: none;"{% endif %}>
<a id="calibration-tag-2" class="calibration-tag navbar-item" href="/calibration/calibrate" {% if calibration.state != 2 %}style="display: none;"{% endif %}>
<span class="tags has-addons">
<span class="tag is-dark">étalonnage</span>
<span class="tag is-warning">non validé</span>
</span>
</a>
<a id="calibration-tag-3" class="calibration-tag navbar-item" href="/calibrate" {% if calibration.state != 3 %}style="display: none;"{% endif %}>
<a id="calibration-tag-3" class="calibration-tag navbar-item" href="/calibration/calibrate" {% if calibration.state != 3 %}style="display: none;"{% endif %}>
<span class="tags has-addons">
<span class="tag is-dark">étalonnage</span>
<span class="tag is-success">validé le {{ calibration.get_pretty_short_date() }}</span>

View File

@ -12,7 +12,7 @@
<button id="scan-button" class="button is-link">Acquérir les données d'étalonnage</button>
</div>
<div class="control">
<a href="/use-last-calibration" class="button is-link">Réutiliser le dernier étalonnage</a>
<a href="/calibration/use-last" class="button is-link">Réutiliser le dernier étalonnage</a>
</div>
</div>
<article id="error-container" class="message is-danger" style="display: none;">
@ -68,7 +68,7 @@
progressBar.style.display = "block";
grid.innerHTML = '';
let response = await fetch('/api/scan-for-calibration');
let response = await fetch('/calibration/scan');
let reader = response.body.pipeThrough(new TextDecoderStream()).getReader();
while (true) {
@ -101,7 +101,7 @@
calibrateButton.addEventListener('click', async () => {
calibrateButton.classList.add('is-loading');
await fetch('/api/calibrate');
await fetch('/calibration/compute');
window.location.reload();
});

View File

@ -51,14 +51,14 @@
</button>
</div>
<div class="control">
<a href="/new-calibration" class="button is-link">Créer un nouvel étalonnage</a>
<a href="/calibration/create" class="button is-link">Créer un nouvel étalonnage</a>
</div>
{% else %}
<div class="control">
<a href="/cancel-calibration" class="button">Retourner à la page d'acquisition</a>
<a href="/calibration/cancel" class="button">Retourner à la page d'acquisition</a>
</div>
<div class="control">
<a href="/validate-calibration" class="button is-link">Valider l'étalonnage</a>
<a href="/calibration/validate" class="button is-link">Valider l'étalonnage</a>
</div>
{% endif %}
</div>

View File

@ -64,7 +64,7 @@
</div>
<div id="add-object-modal" class="modal">
<div class="modal-background"></div>
<form action="/create-object/" method="POST">
<form action="/object/create" method="POST">
<div class="modal-content">
<div class="field">
<label class="label">Nom du projet</label>

View File

@ -6,10 +6,10 @@
<h1 class="title">{{ object.name }}</h1>
<div class="field is-grouped">
<div class="control">
<a class="button is-link" href="/download-object/tar/{{ object.id }}">Télécharger les données de l'objet (archive TAR)</a>
<a class="button is-link" href="/object/download/tar/{{ object.id }}">Télécharger les données de l'objet (archive TAR)</a>
</div>
<div class="control">
<a class="button is-link" href="/download-object/zip/{{ object.id }}">Télécharger les données de l'objet (archive ZIP)</a>
<a class="button is-link" href="/object/download/zip/{{ object.id }}">Télécharger les données de l'objet (archive ZIP)</a>
</div>
</div>
{% if object.acquisitions %}
@ -42,7 +42,7 @@
</div>
{% endif %}
<div class="control">
<a href="/delete-object/{{ object.id }}" class="button is-danger">Supprimer cet objet</a>
<a href="/object/delete/{{ object.id }}" class="button is-danger">Supprimer cet objet</a>
</div>
</div>
@ -54,10 +54,10 @@
<div class="field is-grouped is-grouped-centered">
<div class="control">
<a href="/calibrate/" class="button is-link">Étalonner le scanner</a>
<a href="/calibration/calibrate/" class="button is-link">Étalonner le scanner</a>
</div>
<div class="control">
<button id="use-last-calibration-button" href="/use-last-calibration/" class="button is-link">Réutiliser le dernier étalonnage</button>
<button id="use-last-calibration-button" href="/calibration/use-last/" class="button is-link">Réutiliser le dernier étalonnage</button>
</div>
</div>
</div>

528
utils.py
View File

@ -1,524 +1,16 @@
import numpy as np
import scipy.ndimage as ndimage
from flask import session
import sqlite3
from . import db
def dot_product(v1, v2):
"""Computes the dot product between two arrays of vectors.
Args:
v1 (Array ..., ndim): First array of vectors.
v2 (Array ..., ndim): Second array of vectors.
Returns:
Array ...: Dot product between v1 and v2.
def get_calibration(conn: sqlite3.Connection) -> db.Calibration:
"""
result = np.einsum('...i,...i->...', v1, v2)
return result
Retrieves the calibration from the session and the database.
def norm_vector(v):
"""computes the norm and direction of vectors
Args:
v (Array ..., dim): vectors to compute the norm and direction for
Returns:
Array ...: norms of the vectors
Array ..., dim: unit direction vectors
Returns empty calibration if nothing is found.
"""
norm = np.linalg.norm(v, axis=-1)
direction = v/norm[..., np.newaxis]
return norm, direction
calibration_id = session.get('calibration_id', None)
if calibration_id is None:
return db.Calibration.Dummy
def to_homogeneous(v):
"""converts vectors to homogeneous coordinates
Args:
v (Array ..., dim): input vectors
Returns:
Array ..., dim+1: homogeneous coordinates of the input vectors
"""
append_term = np.ones(np.shape(v)[:-1] + (1,))
homogeneous = np.append(v, append_term, axis=-1)
return homogeneous
def cross_to_skew_matrix(v):
"""converts a vector cross product to a skew-symmetric matrix multiplication
Args:
v (Array ..., 3): vectors to convert
Returns:
Array ..., 3, 3: matrices corresponding to the input vectors
"""
indices = np.asarray([[-1, 2, 1], [2, -1, 0], [1, 0, -1]])
signs = np.asarray([[0, -1, 1], [1, 0, -1], [-1, 1, 0]])
skew_matrix = v[..., indices] * signs
return skew_matrix
def build_K_matrix(focal_length, u0, v0):
"""
Build the camera intrinsic matrix.
Parameters:
focal_length (float): Focal length of the camera.
u0 (float): First coordinate of the principal point.
v0 (float): Seccond coordinate of the principal point.
Returns:
numpy.ndarray: Camera intrinsic matrix (3x3).
"""
K = np.asarray([[focal_length, 0, u0],
[0, focal_length, v0],
[0, 0, 1]])
return K
def get_camera_rays(points, K):
"""Computes the camera rays for a set of points given the camera matrix K.
Args:
points (Array ..., 2): Points in the image plane.
K (Array 3, 3): Camera intrinsic matrix.
Returns:
Array ..., 3: Camera rays corresponding to the input points.
"""
homogeneous = to_homogeneous(points)
inv_K = np.linalg.inv(K)
rays = np.einsum('ij,...j->...i', inv_K, homogeneous)
return rays
def matrix_kernel(A):
"""Computes the eigenvector corresponding to the smallest eigenvalue of the matrix A.
Args:
A (Array ..., n, n): Input square matrix.
Returns:
Array ..., n: Eigenvector corresponding to the smallest eigenvalue.
"""
eigval, eigvec = np.linalg.eig(A)
min_index = np.argmin(np.abs(eigval), axis=-1)
min_eigvec = np.take_along_axis(eigvec, min_index[..., None, None], -1)[..., 0]
normed_eigvec = norm_vector(min_eigvec)[1]
return normed_eigvec
def evaluate_bilinear_form(Q, left, right):
"""evaluates bilinear forms at several points
Args:
Q (Array ...,ldim,rdim): bilinear form to evaluate
left (Array ...,ldim): points where the bilinear form is evaluated to the left
right (Array ...,rdim): points where the bilinear form is evaluated to the right
Returns:
Array ... bilinear forms evaluated
"""
result = np.einsum('...ij,...i,...j->...', Q, left, right)
return result
def evaluate_quadratic_form(Q, points):
"""evaluates quadratic forms at several points
Args:
Q (Array ...,dim,dim): quadratic form to evaluate
points (Array ...,dim): points where the quadratic form is evaluated
Returns:
Array ... quadratic forms evaluated
"""
result = evaluate_bilinear_form(Q, points, points)
return result
def merge_quadratic_to_homogeneous(Q, b, c):
"""merges quadratic form, linear term, and constant term into a homogeneous matrix
Args:
Q (Array ..., dim, dim): quadratic form matrix
b (Array ..., dim): linear term vector
c (Array ...): constant term
Returns:
Array ..., dim+1, dim+1: homogeneous matrix representing the quadratic form
"""
dim_points = Q.shape[-1]
stack_shape = np.broadcast_shapes(np.shape(Q)[:-2], np.shape(b)[:-1], np.shape(c))
Q_b = np.broadcast_to(Q, stack_shape + (dim_points, dim_points))
b_b = np.broadcast_to(np.expand_dims(b, -1), stack_shape+(dim_points, 1))
c_b = np.broadcast_to(np.expand_dims(c, (-1, -2)), stack_shape + (1, 1))
H = np.block([[Q_b, 0.5 * b_b], [0.5 * np.swapaxes(b_b, -1, -2), c_b]])
return H
def quadratic_to_dot_product(points):
"""computes the matrix W such that
x.T@Ax = W(x).T*A[ui,uj]
Args:
points ( Array ...,ndim): points of dimension ndim
Returns:
Array ...,ni: dot product matrix (W)
Array ni: i indices of central matrix
Array ni: j indices of central matrix
"""
dim_points = points.shape[-1]
ui, uj = np.triu_indices(dim_points)
W = points[..., ui] * points[..., uj]
return W, ui, uj
def fit_quadratic_form(points):
"""Fits a quadratic form to the given zeroes.
Args:
points (Array ..., n, dim): Input points.
Returns:
Array ..., dim, dim: Fitted quadratic form matrix.
"""
dim_points = points.shape[-1]
normed_points = norm_vector(points)[1]
W, ui, uj = quadratic_to_dot_product(normed_points)
H = np.einsum('...ki,...kj->...ij', W, W)
V0 = matrix_kernel(H)
Q = np.zeros(V0.shape[:-1] + (dim_points, dim_points))
Q[..., ui, uj] = V0
return Q
def gaussian_pdf(mu, sigma, x):
"""Computes the PDF of a multivariate Gaussian distribution.
Args:
mu (Array ...,k): Mean vector.
sigma (Array ...,k,k): Covariance matrix.
x (Array ...,k): Input vector.
Returns:
Array ...: Value of the PDF.
"""
k = np.shape(x)[-1]
Q = np.linalg.inv(sigma)
normalization = np.reciprocal(np.sqrt(np.linalg.det(sigma) * np.power(2.0 * np.pi, k)))
quadratic = evaluate_quadratic_form(Q, x - mu)
result = np.exp(-0.5 * quadratic) * normalization
return result
def gaussian_estimation(x, weights):
"""Estimates the mean and covariance matrix of a Gaussian distribution.
Args:
x (Array ...,n,dim): Data points.
weights (Array ...,n): Weights for each data point.
Returns:
Array ...,dim: Estimated mean vector.
Array ...,dim,dim: Estimated covariance matrix.
"""
weights_sum = np.sum(weights, axis=-1)
mu = np.sum(x*np.expand_dims(weights, axis=-1), axis=-2) / np.expand_dims(weights_sum, axis=-1)
centered_x = x - np.expand_dims(mu, axis=-2)
sigma = np.einsum('...s, ...si, ...sj->...ij', weights, centered_x, centered_x)/np.expand_dims(weights_sum, axis=(-1, -2))
return mu, sigma
def gaussian_mixture_estimation(x, init_params, it=100):
"""Estimates the parameters of a k Gaussian mixture model using the EM algorithm.
Args:
x (Array ..., n, dim): Data points.
init_params (tuple): Initial parameters (pi, sigma, mu).
pi (Array ..., k): Initial mixture weights.
sigma (Array ..., k, dim, dim): Initial covariance matrices.
mu (Array ..., k, dim): Initial means.
it (int, optional): Number of iterations. Defaults to 100.
Returns:
Tuple[(Array ..., k), (Array ..., k, dim, dim), (Array ..., k, dim)]:
Estimated mixture weights,covariance matrices, means.
"""
pi, sigma, mu = init_params
for _ in range(it):
pdf = gaussian_pdf(
np.expand_dims(mu, axis=-2),
np.expand_dims(sigma, axis=-3),
np.expand_dims(x, axis=-3)
) * np.expand_dims(pi, axis=-1)
weights = pdf/np.sum(pdf, axis=-2, keepdims=True)
pi = np.mean(weights, axis=-1)
mu, sigma = gaussian_estimation(x, weights)
return pi, sigma, mu
def maximum_likelihood(x, params):
"""Selects the best gaussian model for a point
Args:
x (Array ..., dim): Data points.
params (tuple): Gaussians parameters (pi, sigma, mu).
pi (Array ..., k): Mixture weights.
sigma (Array ..., k, dim, dim): Covariance matrices.
mu (Array ..., k, dim): Means.
Returns:
Array ...: integer in [0,k-1] giving the maximum likelihood model
"""
pi, sigma, mu = params
pdf = gaussian_pdf(mu, sigma, np.expand_dims(x, axis=-2))*pi
result = np.argmax(pdf, axis=-1)
return result
def get_greatest_components(mask, n):
"""
Extract the n largest connected components from a binary mask.
Parameters:
mask (Array ...): The binary mask.
n (int): The number of largest connected components to extract.
Returns:
Array n,...: A boolean array of the n largest connected components
"""
labeled, _ = ndimage.label(mask)
unique, counts = np.unique(labeled, return_counts=True)
greatest_labels = unique[unique != 0][np.argsort(counts[unique != 0])[-n:]]
greatest_components = labeled[np.newaxis, ...] == np.expand_dims(greatest_labels, axis=tuple(range(1, 1 + mask.ndim)))
return greatest_components
def get_mask_border(mask):
"""
Extract the border from a binary mask.
Parameters:
mask (Array ...): The binary mask.
Returns:
Array ...: A boolean array mask of the border
"""
inverted_mask = np.logical_not(mask)
dilated = ndimage.binary_dilation(inverted_mask)
border = np.logical_and(mask, dilated)
return border
def select_binary_mask(mask, metric):
"""Selects the side of a binary mask that optimizes the given metric.
Args:
mask (Array bool ...): Initial binary mask.
metric (function): Function to evaluate the quality of the mask.
Returns:
Array bool ...: Selected binary mask that maximizes the metric.
"""
inverted = np.logical_not(mask)
result = mask if metric(mask) > metric(inverted) else inverted
return result
def deproject_ellipse_to_sphere(M, radius):
"""finds the deprojection of an ellipse to a sphere
Args:
M (Array 3,3): Ellipse quadratic form
radius (float): radius of the researched sphere
Returns:
Array 3: solution of sphere centre location
"""
H = 0.5 * (np.swapaxes(M, -1, -2) + M)
eigval, eigvec = np.linalg.eigh(H)
i_unique = np.argmax(np.abs(np.median(eigval, axis=-1, keepdims=True) - eigval), axis=-1)
unique_eigval = np.take_along_axis(eigval, i_unique[..., None], -1)[..., 0]
unique_eigvec = np.take_along_axis(eigvec, i_unique[..., None, None], -1)[..., 0]
double_eigval = 0.5 * (np.sum(eigval, axis=-1) - unique_eigval)
z_sign = np.sign(unique_eigvec[..., -1])
dist = np.sqrt(1 - double_eigval / unique_eigval)
C = np.real(radius * (dist * z_sign)[..., None] * norm_vector(unique_eigvec)[1])
return C
def weighted_least_squares(A, y, weights):
"""Computes the weighted least squares solution of Ax=y.
Args:
A (Array ...,u,v): Design matrix.
y (Array ...,u): Target values.
weights (Array ...,u): Weights for each equation.
Returns:
Array ...,v : Weighted least squares solution.
"""
pinv = np.linalg.pinv(A * weights[..., np.newaxis])
result = np.einsum('...uv,...v->...u', pinv, y * weights)
return result
def iteratively_reweighted_least_squares(A, y, epsilon=1e-5, it=20):
"""Computes the iteratively reweighted least squares solution. of Ax=y
Args:
A (Array ..., u, v): Design matrix.
y (Array ..., u): Target values.
epsilon (float, optional): Small value to avoid division by zero. Defaults to 1e-5.
it (int, optional): Number of iterations. Defaults to 20.
Returns:
Array ..., v: Iteratively reweighted least squares solution.
"""
weights = np.ones(y.shape)
for _ in range(it):
result = weighted_least_squares(A, y, weights)
ychap = np.einsum('...uv, ...v->...u', A, result)
delta = np.abs(ychap-y)
weights = np.reciprocal(np.maximum(epsilon, np.sqrt(delta)))
return result
def lines_intersections_system(points, directions):
"""computes the system of equations for intersections of lines, Ax=b
where x is the instersection
Args:
points (Array ..., npoints, ndim): points through which the lines pass
directions (Array ..., npoints, ndim): direction vectors of the lines
Returns:
Array ..., 3*npoints, ndim: coefficient matrix A for the system of equations
Array ..., 3*npoints: right-hand side vector b for the system of equations
"""
n = norm_vector(directions)[1]
skew = np.swapaxes(cross_to_skew_matrix(n), -1, -2)
root = np.einsum('...uij, ...uj->...ui', skew, points)
A = np.concatenate(np.moveaxis(skew, -3, 0), axis=-2)
b = np.concatenate(np.moveaxis(root, -2, 0), axis=-1)
return A, b
def lines_intersections(points, directions):
"""computes the intersections of lines
Args:
points (Array ..., npoints, ndim): points through which the lines pass
directions (Array ..., npoints, ndim): direction vectors of the lines
Returns:
Array ..., ndim: intersection
"""
A, b = lines_intersections_system(points, directions)
x = iteratively_reweighted_least_squares(A, b)
return x
def line_sphere_intersection_determinant(center, radius, directions):
"""computes the determinant for the intersection of a line and a sphere,
Args:
center (Array ..., dim): center of the sphere
radius (Array ...): radius of the sphere
directions (Array ..., dim): direction of the line
Returns:
Array ...:intersection determinant
"""
directions_norm_2 = np.square(norm_vector(directions)[0])
center_norm_2 = np.square(norm_vector(center)[0])
dot_product_2 = np.square(dot_product(center, directions))
delta = dot_product_2 - directions_norm_2 * (center_norm_2 - np.square(radius))
return delta
def line_plane_intersection(normal, alpha, directions):
"""Computes the intersection points between a line and a plane.
Args:
normal (Array ..., ndim): Normal vector to the plane.
alpha (Array ...): Plane constant alpha.
directions (Array ..., dim): direction of the line
Returns:
Array ..., ndim: Intersection points between the line and the sphere.
"""
t = -alpha*np.reciprocal(dot_product(directions, normal))
intersection = directions*t[..., np.newaxis]
return intersection
def line_sphere_intersection(center, radius, directions):
"""Computes the intersection points between a line and a sphere.
Args:
center (Array ..., ndim): Center of the sphere.
radius (Array ...): Radius of the sphere.
directions (Array ..., ndim): Direction vectors of the line.
Returns:
Array ..., ndim: Intersection points between the line and the sphere.
Array bool ...: Mask of intersection points
"""
delta = line_sphere_intersection_determinant(center, radius, directions)
mask = delta > 0
directions_norm_2 = np.square(norm_vector(directions)[0])
distances = (dot_product(center, directions) - np.sqrt(np.maximum(0, delta))) * np.reciprocal(directions_norm_2)
intersection = np.expand_dims(distances, axis=-1) * directions
return intersection, mask
def sphere_intersection_normal(center, point):
"""Computes the normal vector at the intersection point on a sphere.
Args:
center (Array ..., dim): Coordinates of the sphere center.
point (Array ..., dim): Coordinates of the intersection point.
Returns:
Array ..., dim: Normal normal vector at the intersection point.
"""
vector = point - center
normal = norm_vector(vector)[1]
return normal
def estimate_light(normals, grey_levels, treshold=(0, 1)):
"""Estimates the light directions using the given normals, grey levels, and mask.
Args:
normals (Array ..., n, dim): Normal vectors.
grey_levels (Array ..., n): Grey levels corresponding to the normals.
threshold (tuple, optional): Intensity threshold for valid grey levels. Defaults to (0, 1).
Returns:
Array ..., dim: Estimated light directions.
"""
validity_mask = np.logical_and(grey_levels > treshold[0], grey_levels < treshold[1])
lights = weighted_least_squares(normals, grey_levels, validity_mask)
return lights
def plane_parameters_from_points(points):
"""Computes the parameters of a plane from a set of points.
Args:
points (Array ..., dim): Coordinates of the points used to define the plane.
Returns:
Array ..., dim: Normal vector to the plane.
Array ...: Plane constant alpha.
"""
homogeneous = to_homogeneous(points)
E = np.einsum('...ki,...kj->...ij', homogeneous, homogeneous)
L = matrix_kernel(E)
n, alpha = L[..., :-1], L[..., -1]
return n, alpha
return db.Calibration.get_from_id(calibration_id, conn)