280 lines
7.9 KiB
Python
Executable File
280 lines
7.9 KiB
Python
Executable File
#!/usr/bin/env python
|
|
|
|
from flask import Flask, redirect, request, render_template, send_from_directory, session
|
|
import json
|
|
import os
|
|
from os.path import join
|
|
import sqlite3
|
|
import uuid
|
|
from typing import Optional
|
|
from . import db, config, scanner, calibration
|
|
|
|
app = Flask(__name__)
|
|
|
|
# Manage secret key
|
|
try:
|
|
from . import secret
|
|
app.config['SECRET_KEY'] = secret.SECRET_KEY
|
|
except ImportError:
|
|
# Secret key file does not exist, create it
|
|
secret = os.urandom(50).hex()
|
|
with open('secret.py', 'w') as f:
|
|
f.write(f'SECRET_KEY = "{secret}"')
|
|
app.config['SECRET_KEY'] = secret
|
|
|
|
|
|
def get_calibration(conn: sqlite3.Connection) -> db.Calibration:
|
|
calibration_id = session.get('calibration_id', None)
|
|
if calibration_id is None:
|
|
return db.Calibration.Dummy
|
|
|
|
return db.Calibration.get_from_id(calibration_id, conn)
|
|
|
|
|
|
@app.context_processor
|
|
def inject_stage_and_region():
|
|
conn = db.get()
|
|
return dict(calibration=get_calibration(conn), leds=config.LEDS_UUIDS, CalibrationState=db.CalibrationState)
|
|
|
|
|
|
@app.before_request
|
|
def manage_auto_use_last_calibration():
|
|
if config.AUTO_USE_LAST_CALIBRATION and 'calibration_id' not in session:
|
|
conn = db.get()
|
|
last = db.Calibration.get_last(conn)
|
|
if last is not None:
|
|
session['calibration_id'] = last.id
|
|
|
|
|
|
@app.route("/")
|
|
def index():
|
|
conn = db.get()
|
|
objects = db.Object.all(conn)
|
|
return render_template('index.html', objects=objects)
|
|
|
|
|
|
@app.route("/create-object/", methods=["POST"])
|
|
def create_object():
|
|
conn = db.get()
|
|
with conn:
|
|
db.Object.create(request.form.get('name'), conn)
|
|
return redirect('/')
|
|
|
|
|
|
@app.route('/object/<id>')
|
|
def object(id: int):
|
|
conn = db.get()
|
|
object = db.Object.get_from_id(id, conn).full(conn)
|
|
return render_template('object.html', object=object)
|
|
|
|
|
|
@app.route('/scan/<id>')
|
|
def scan(id: int):
|
|
conn = db.get()
|
|
calibration_id = session.get('calibration_id', None)
|
|
object = db.Object.get_from_id(id, conn)
|
|
|
|
if calibration_id is None:
|
|
raise RuntimeError("Impossible de faire l'acquisition sans étalonnage")
|
|
|
|
return render_template('scan.html', object=object)
|
|
|
|
|
|
@app.route('/scan-acquisition/<id>')
|
|
def scan_existing(id: int):
|
|
conn = db.get()
|
|
calibration_id = session.get('calibration_id', None)
|
|
acquisition = db.Acquisition.get_from_id(id, conn)
|
|
object = acquisition.object(conn)
|
|
|
|
if calibration_id is None:
|
|
raise RuntimeError("Impossible de faire l'acquisition sans étalonnage")
|
|
|
|
return render_template('scan.html', object=object, acquisition=acquisition)
|
|
|
|
|
|
@app.route("/calibrate/")
|
|
def calibrate():
|
|
conn = db.get()
|
|
if 'calibration_id' not in session:
|
|
with conn:
|
|
calibration = db.Calibration.create(conn)
|
|
session['calibration_id'] = calibration.id
|
|
else:
|
|
calibration = db.Calibration.get_from_id(session['calibration_id'], conn)
|
|
|
|
if calibration.state in [db.CalibrationState.Empty, db.CalibrationState.HasData]:
|
|
return render_template('calibrate.html')
|
|
else:
|
|
return render_template('calibration.html', calibration=calibration)
|
|
|
|
|
|
@app.route("/new-calibration")
|
|
def new_calibration():
|
|
conn = db.get()
|
|
with conn:
|
|
calibration = db.Calibration.create(conn)
|
|
session['calibration_id'] = calibration.id
|
|
|
|
return redirect('/calibrate')
|
|
|
|
|
|
@app.route("/cancel-calibration")
|
|
def cancel_calibration():
|
|
conn = db.get()
|
|
calibration = db.Calibration.get_from_id(session['calibration_id'], conn)
|
|
calibration.state = db.CalibrationState.HasData
|
|
with conn:
|
|
calibration.save(conn)
|
|
return redirect('/calibrate')
|
|
|
|
|
|
@app.route("/api/preview/<id>")
|
|
def preview(id: int):
|
|
conn = db.get()
|
|
db.Object.get_from_id(id, conn)
|
|
capture_uuid = uuid.uuid4()
|
|
if scanner.capture(join(config.DATA_DIR, str(id), 'previews', str(capture_uuid) + '.jpg')):
|
|
return str(capture_uuid)
|
|
else:
|
|
return "Impossible de capturer l'image.", 500
|
|
|
|
|
|
@app.route("/api/scan-for-calibration")
|
|
def scan_calibration():
|
|
conn = db.get()
|
|
|
|
if 'calibration_id' not in session:
|
|
with conn:
|
|
calibration = db.Calibration.create(conn)
|
|
calibration_id = str(calibration.id)
|
|
session['calibration_id'] = calibration.id
|
|
else:
|
|
calibration_id = str(session['calibration_id'])
|
|
calibration = get_calibration(conn)
|
|
|
|
def generate():
|
|
length = len(config.LEDS_UUIDS)
|
|
for index, led_uuid in enumerate(scanner.scan(join(config.CALIBRATION_DIR, calibration_id))):
|
|
yield f"{led_uuid},{(index+1)/length}\n"
|
|
|
|
with conn:
|
|
calibration.state = db.CalibrationState.HasData
|
|
calibration.save(conn)
|
|
|
|
return app.response_class(generate(), mimetype='text/plain')
|
|
|
|
|
|
@app.route("/api/scan-for-object/<object_id>")
|
|
def scan_object(object_id: int):
|
|
conn = db.get()
|
|
calibration_id = session.get('calibration_id', None)
|
|
|
|
if calibration_id is None:
|
|
raise RuntimeError("Impossible de faire l'acquisition sans étalonnage")
|
|
|
|
object = db.Object.get_from_id(object_id, conn)
|
|
|
|
if object is None:
|
|
raise RuntimeError(f"Aucun objet d'id {object_id}")
|
|
|
|
with conn:
|
|
acquisition = object.add_acquisition(calibration_id, conn)
|
|
|
|
def generate():
|
|
yield str(acquisition.id)
|
|
length = len(config.LEDS_UUIDS)
|
|
for index, led_uuid in enumerate(scanner.scan(join(config.OBJECT_DIR, str(object.id), str(acquisition.id)))):
|
|
yield f"{led_uuid},{(index+1)/length}\n"
|
|
|
|
return app.response_class(generate(), mimetype='text/plain')
|
|
|
|
|
|
@app.route("/api/scan-for-acquisition/<acquisition_id>")
|
|
def scan_acquisition(acquisition_id: int):
|
|
conn = db.get()
|
|
calibration_id = session.get('calibration_id', None)
|
|
|
|
if calibration_id is None:
|
|
raise RuntimeError("Impossible de faire l'acquisition sans étalonnage")
|
|
|
|
acquisition = db.Acquisition.get_from_id(acquisition_id, conn)
|
|
|
|
if acquisition is None:
|
|
raise RuntimeError(f"Aucun acquisition d'id {acquisition_id}")
|
|
|
|
object = acquisition.object(conn)
|
|
|
|
def generate():
|
|
length = len(config.LEDS_UUIDS)
|
|
for index, led_uuid in enumerate(scanner.scan(join(config.OBJECT_DIR, str(object.id), str(acquisition.id)))):
|
|
yield f"{led_uuid},{(index+1)/length}\n"
|
|
|
|
return app.response_class(generate(), mimetype='text/plain')
|
|
|
|
|
|
@app.route("/validate-acquisition/<acquisition_id>")
|
|
def validate_acquisition(acquisition_id: int):
|
|
conn = db.get()
|
|
acquisition = db.Acquisition.get_from_id(acquisition_id, conn)
|
|
|
|
if acquisition is None:
|
|
raise f"Aucune acquisition d'id {acquisition_id}"
|
|
|
|
object = acquisition.object(conn)
|
|
|
|
acquisition.validated = True
|
|
with conn:
|
|
acquisition.save(conn)
|
|
|
|
return redirect(f'/object/{object.id}')
|
|
|
|
|
|
@app.route('/api/use-last-calibration')
|
|
def use_last_calibration():
|
|
conn = db.get()
|
|
calibration = db.Calibration.get_last(conn)
|
|
session['calibration_id'] = calibration.id
|
|
return 'ok'
|
|
|
|
|
|
@app.route("/api/calibrate")
|
|
def run_calibration():
|
|
conn = db.get()
|
|
id = session['calibration_id']
|
|
calib = db.Calibration.get_from_id(id, conn)
|
|
if calib is None:
|
|
return 'oops', 404
|
|
|
|
calibration_json = calibration.calibrate(join(config.CALIBRATION_DIR, str(id)))
|
|
with open(join(config.CALIBRATION_DIR, str(id), 'calibration.json'), 'w') as f:
|
|
json.dump(calibration_json, f, indent=4)
|
|
with conn:
|
|
calib.state = db.CalibrationState.IsComputed
|
|
calib.save(conn)
|
|
|
|
return 'ok'
|
|
|
|
|
|
@app.route('/validate-calibration')
|
|
def validate_calibration():
|
|
conn = db.get()
|
|
calib = get_calibration(conn)
|
|
if calib is None:
|
|
return 'oops', 404
|
|
|
|
with conn:
|
|
calib.validate(conn)
|
|
|
|
return redirect('/')
|
|
|
|
|
|
@app.route('/static/<path:path>')
|
|
def send_static(path):
|
|
return send_from_directory('static', path)
|
|
|
|
|
|
@app.route('/data/<path:path>')
|
|
def send_data(path):
|
|
return send_from_directory('data', path)
|