#!/usr/bin/env python from flask import Flask, redirect, request, render_template, send_from_directory, session import itertools import json import os from os.path import join import sqlite3 import uuid from . import db, config, scanner, calibration, archive app = Flask(__name__) # Manage secret key try: from . import secret app.config['SECRET_KEY'] = secret.SECRET_KEY except ImportError: # Secret key file does not exist, create it secret = os.urandom(50).hex() with open('secret.py', 'w') as f: f.write(f'SECRET_KEY = "{secret}"') app.config['SECRET_KEY'] = secret def get_calibration(conn: sqlite3.Connection) -> db.Calibration: calibration_id = session.get('calibration_id', None) if calibration_id is None: return db.Calibration.Dummy return db.Calibration.get_from_id(calibration_id, conn) @app.context_processor def inject_stage_and_region(): conn = db.get() return dict(calibration=get_calibration(conn), leds=config.LEDS_UUIDS, CalibrationState=db.CalibrationState) @app.before_request def manage_auto_use_last_calibration(): if config.AUTO_USE_LAST_CALIBRATION and 'calibration_id' not in session: conn = db.get() last = db.Calibration.get_last(conn) if last is not None: session['calibration_id'] = last.id @app.route("/") def index(): conn = db.get() objects = db.Object.all(conn) return render_template('index.html', objects=objects) @app.route("/create-object/", methods=["POST"]) def create_object(): conn = db.get() with conn: db.Object.create(request.form.get('name'), conn) return redirect('/') @app.route('/object/') def object(id: int): conn = db.get() object = db.Object.get_from_id(id, conn).full(conn) return render_template('object.html', object=object) @app.route('/scan/') def scan(id: int): conn = db.get() calibration_id = session.get('calibration_id', None) object = db.Object.get_from_id(id, conn) if calibration_id is None: raise RuntimeError("Impossible de faire l'acquisition sans étalonnage") return render_template('scan.html', object=object) @app.route('/scan-acquisition/') def scan_existing(id: int): conn = db.get() calibration_id = session.get('calibration_id', None) acquisition = db.Acquisition.get_from_id(id, conn) object = acquisition.object(conn) if calibration_id is None: raise RuntimeError("Impossible de faire l'acquisition sans étalonnage") return render_template('scan.html', object=object, acquisition=acquisition) @app.route("/calibrate/") def calibrate(): conn = db.get() if 'calibration_id' not in session: with conn: calibration = db.Calibration.create(conn) session['calibration_id'] = calibration.id else: calibration = db.Calibration.get_from_id(session['calibration_id'], conn) if calibration.state in [db.CalibrationState.Empty, db.CalibrationState.HasData]: return render_template('calibrate.html') else: return render_template('calibration.html', calibration=calibration) @app.route("/new-calibration") def new_calibration(): conn = db.get() with conn: calibration = db.Calibration.create(conn) session['calibration_id'] = calibration.id return redirect('/calibrate') @app.route("/cancel-calibration") def cancel_calibration(): conn = db.get() calibration = db.Calibration.get_from_id(session['calibration_id'], conn) calibration.state = db.CalibrationState.HasData with conn: calibration.save(conn) return redirect('/calibrate') @app.route("/api/preview/") def preview(id: int): conn = db.get() db.Object.get_from_id(id, conn) capture_uuid = uuid.uuid4() if scanner.capture(join(config.DATA_DIR, str(id), 'previews', str(capture_uuid) + '.jpg')): return str(capture_uuid) else: return "Impossible de capturer l'image.", 500 @app.route("/api/scan-for-calibration") def scan_calibration(): conn = db.get() if 'calibration_id' not in session: with conn: calibration = db.Calibration.create(conn) calibration_id = str(calibration.id) session['calibration_id'] = calibration.id else: calibration_id = str(session['calibration_id']) calibration = get_calibration(conn) def generate(): length = len(config.LEDS_UUIDS) for index, led_uuid in enumerate(scanner.scan(join(config.CALIBRATION_DIR, calibration_id))): yield f"{led_uuid},{(index+1)/length}\n" with conn: calibration.state = db.CalibrationState.HasData calibration.save(conn) return app.response_class(generate(), mimetype='text/plain') @app.route("/api/scan-for-object/") def scan_object(object_id: int): conn = db.get() calibration_id = session.get('calibration_id', None) if calibration_id is None: raise RuntimeError("Impossible de faire l'acquisition sans étalonnage") object = db.Object.get_from_id(object_id, conn) if object is None: raise RuntimeError(f"Aucun objet d'id {object_id}") with conn: acquisition = object.add_acquisition(calibration_id, conn) def generate(): yield str(acquisition.id) length = len(config.LEDS_UUIDS) for index, led_uuid in enumerate(scanner.scan(join(config.OBJECT_DIR, str(object.id), str(acquisition.id)))): yield f"{led_uuid},{(index+1)/length}\n" return app.response_class(generate(), mimetype='text/plain') @app.route("/api/scan-for-acquisition/") def scan_acquisition(acquisition_id: int): conn = db.get() calibration_id = session.get('calibration_id', None) if calibration_id is None: raise RuntimeError("Impossible de faire l'acquisition sans étalonnage") acquisition = db.Acquisition.get_from_id(acquisition_id, conn) if acquisition is None: raise RuntimeError(f"Aucun acquisition d'id {acquisition_id}") object = acquisition.object(conn) def generate(): length = len(config.LEDS_UUIDS) for index, led_uuid in enumerate(scanner.scan(join(config.OBJECT_DIR, str(object.id), str(acquisition.id)))): yield f"{led_uuid},{(index+1)/length}\n" return app.response_class(generate(), mimetype='text/plain') @app.route("/validate-acquisition/") def validate_acquisition(acquisition_id: int): conn = db.get() acquisition = db.Acquisition.get_from_id(acquisition_id, conn) if acquisition is None: raise f"Aucune acquisition d'id {acquisition_id}" object = acquisition.object(conn) acquisition.validated = True with conn: acquisition.save(conn) return redirect(f'/object/{object.id}') @app.route('/api/use-last-calibration') def use_last_calibration(): conn = db.get() calibration = db.Calibration.get_last(conn) session['calibration_id'] = calibration.id return 'ok' @app.route("/api/calibrate") def run_calibration(): conn = db.get() id = session['calibration_id'] calib = db.Calibration.get_from_id(id, conn) if calib is None: return 'oops', 404 calibration_json = calibration.calibrate(join(config.CALIBRATION_DIR, str(id))) with open(join(config.CALIBRATION_DIR, str(id), 'calibration.json'), 'w') as f: json.dump(calibration_json, f, indent=4) with conn: calib.state = db.CalibrationState.IsComputed calib.save(conn) return 'ok' @app.route('/validate-calibration') def validate_calibration(): conn = db.get() calib = get_calibration(conn) if calib is None: return 'oops', 404 with conn: calib.validate(conn) return redirect('/') def download_object(id: int, tar: archive.ArchiveSender): conn = db.get() object = db.Object.get_from_id(id, conn).full(conn) # Group acquisitions sharing calibration def keyfunc(x: db.Calibration) -> int: return x.calibration_id acquisitions_sorted = sorted(object.acquisitions, key=keyfunc) acquisitions_grouped = [ (db.Calibration.get_from_id(k, conn), list(g)) for k, g in itertools.groupby(acquisitions_sorted, key=keyfunc) ] # Create archive file to send for calibration_index, (calib, acquisitions) in enumerate(acquisitions_grouped): calibration_dir = join(config.CALIBRATION_DIR, str(calib.id)) # Add calibration images for image in os.listdir(calibration_dir): tar.add_file( f'object/{calibration_index}/calibration/{image}', join(calibration_dir, image) ) # Add each acquisition for acquisition_index, acquisition in enumerate(acquisitions): acquisition_dir = join(config.OBJECT_DIR, str(object.id), str(acquisition.id)) for image in os.listdir(acquisition_dir): tar.add_file( f'object/{calibration_index}/{acquisition_index}/{image}', join(acquisition_dir, image) ) return tar.response() @app.route('/download-object/tar/') def download_object_tar(id: int): return download_object(id, archive.TarSender()) @app.route('/download-object/zip/') def download_object_zip(id: int): return download_object(id, archive.ZipSender()) @app.route('/static/') def send_static(path): return send_from_directory('static', path) @app.route('/data/') def send_data(path): return send_from_directory('data', path)