378 lines
11 KiB
Python
Executable File
378 lines
11 KiB
Python
Executable File
#!/usr/bin/env python
|
|
|
|
from flask import Flask, redirect, request, render_template, send_from_directory, session
|
|
import itertools
|
|
import io
|
|
import json
|
|
import os
|
|
from os.path import join
|
|
import sqlite3
|
|
import uuid
|
|
import tarfile
|
|
from . import db, config, scanner, calibration
|
|
|
|
app = Flask(__name__)
|
|
|
|
# Manage secret key
|
|
try:
|
|
from . import secret
|
|
app.config['SECRET_KEY'] = secret.SECRET_KEY
|
|
except ImportError:
|
|
# Secret key file does not exist, create it
|
|
secret = os.urandom(50).hex()
|
|
with open('secret.py', 'w') as f:
|
|
f.write(f'SECRET_KEY = "{secret}"')
|
|
app.config['SECRET_KEY'] = secret
|
|
|
|
|
|
def get_calibration(conn: sqlite3.Connection) -> db.Calibration:
|
|
calibration_id = session.get('calibration_id', None)
|
|
if calibration_id is None:
|
|
return db.Calibration.Dummy
|
|
|
|
return db.Calibration.get_from_id(calibration_id, conn)
|
|
|
|
|
|
@app.context_processor
|
|
def inject_stage_and_region():
|
|
conn = db.get()
|
|
return dict(calibration=get_calibration(conn), leds=config.LEDS_UUIDS, CalibrationState=db.CalibrationState)
|
|
|
|
|
|
@app.before_request
|
|
def manage_auto_use_last_calibration():
|
|
if config.AUTO_USE_LAST_CALIBRATION and 'calibration_id' not in session:
|
|
conn = db.get()
|
|
last = db.Calibration.get_last(conn)
|
|
if last is not None:
|
|
session['calibration_id'] = last.id
|
|
|
|
|
|
@app.route("/")
|
|
def index():
|
|
conn = db.get()
|
|
objects = db.Object.all(conn)
|
|
return render_template('index.html', objects=objects)
|
|
|
|
|
|
@app.route("/create-object/", methods=["POST"])
|
|
def create_object():
|
|
conn = db.get()
|
|
with conn:
|
|
db.Object.create(request.form.get('name'), conn)
|
|
return redirect('/')
|
|
|
|
|
|
@app.route('/object/<id>')
|
|
def object(id: int):
|
|
conn = db.get()
|
|
object = db.Object.get_from_id(id, conn).full(conn)
|
|
return render_template('object.html', object=object)
|
|
|
|
|
|
@app.route('/scan/<id>')
|
|
def scan(id: int):
|
|
conn = db.get()
|
|
calibration_id = session.get('calibration_id', None)
|
|
object = db.Object.get_from_id(id, conn)
|
|
|
|
if calibration_id is None:
|
|
raise RuntimeError("Impossible de faire l'acquisition sans étalonnage")
|
|
|
|
return render_template('scan.html', object=object)
|
|
|
|
|
|
@app.route('/scan-acquisition/<id>')
|
|
def scan_existing(id: int):
|
|
conn = db.get()
|
|
calibration_id = session.get('calibration_id', None)
|
|
acquisition = db.Acquisition.get_from_id(id, conn)
|
|
object = acquisition.object(conn)
|
|
|
|
if calibration_id is None:
|
|
raise RuntimeError("Impossible de faire l'acquisition sans étalonnage")
|
|
|
|
return render_template('scan.html', object=object, acquisition=acquisition)
|
|
|
|
|
|
@app.route("/calibrate/")
|
|
def calibrate():
|
|
conn = db.get()
|
|
if 'calibration_id' not in session:
|
|
with conn:
|
|
calibration = db.Calibration.create(conn)
|
|
session['calibration_id'] = calibration.id
|
|
else:
|
|
calibration = db.Calibration.get_from_id(session['calibration_id'], conn)
|
|
|
|
if calibration.state in [db.CalibrationState.Empty, db.CalibrationState.HasData]:
|
|
return render_template('calibrate.html')
|
|
else:
|
|
return render_template('calibration.html', calibration=calibration)
|
|
|
|
|
|
@app.route("/new-calibration")
|
|
def new_calibration():
|
|
conn = db.get()
|
|
with conn:
|
|
calibration = db.Calibration.create(conn)
|
|
session['calibration_id'] = calibration.id
|
|
|
|
return redirect('/calibrate')
|
|
|
|
|
|
@app.route("/cancel-calibration")
|
|
def cancel_calibration():
|
|
conn = db.get()
|
|
calibration = db.Calibration.get_from_id(session['calibration_id'], conn)
|
|
calibration.state = db.CalibrationState.HasData
|
|
with conn:
|
|
calibration.save(conn)
|
|
return redirect('/calibrate')
|
|
|
|
|
|
@app.route("/api/preview/<id>")
|
|
def preview(id: int):
|
|
conn = db.get()
|
|
db.Object.get_from_id(id, conn)
|
|
capture_uuid = uuid.uuid4()
|
|
if scanner.capture(join(config.DATA_DIR, str(id), 'previews', str(capture_uuid) + '.jpg')):
|
|
return str(capture_uuid)
|
|
else:
|
|
return "Impossible de capturer l'image.", 500
|
|
|
|
|
|
@app.route("/api/scan-for-calibration")
|
|
def scan_calibration():
|
|
conn = db.get()
|
|
|
|
if 'calibration_id' not in session:
|
|
with conn:
|
|
calibration = db.Calibration.create(conn)
|
|
calibration_id = str(calibration.id)
|
|
session['calibration_id'] = calibration.id
|
|
else:
|
|
calibration_id = str(session['calibration_id'])
|
|
calibration = get_calibration(conn)
|
|
|
|
def generate():
|
|
length = len(config.LEDS_UUIDS)
|
|
for index, led_uuid in enumerate(scanner.scan(join(config.CALIBRATION_DIR, calibration_id))):
|
|
yield f"{led_uuid},{(index+1)/length}\n"
|
|
|
|
with conn:
|
|
calibration.state = db.CalibrationState.HasData
|
|
calibration.save(conn)
|
|
|
|
return app.response_class(generate(), mimetype='text/plain')
|
|
|
|
|
|
@app.route("/api/scan-for-object/<object_id>")
|
|
def scan_object(object_id: int):
|
|
conn = db.get()
|
|
calibration_id = session.get('calibration_id', None)
|
|
|
|
if calibration_id is None:
|
|
raise RuntimeError("Impossible de faire l'acquisition sans étalonnage")
|
|
|
|
object = db.Object.get_from_id(object_id, conn)
|
|
|
|
if object is None:
|
|
raise RuntimeError(f"Aucun objet d'id {object_id}")
|
|
|
|
with conn:
|
|
acquisition = object.add_acquisition(calibration_id, conn)
|
|
|
|
def generate():
|
|
yield str(acquisition.id)
|
|
length = len(config.LEDS_UUIDS)
|
|
for index, led_uuid in enumerate(scanner.scan(join(config.OBJECT_DIR, str(object.id), str(acquisition.id)))):
|
|
yield f"{led_uuid},{(index+1)/length}\n"
|
|
|
|
return app.response_class(generate(), mimetype='text/plain')
|
|
|
|
|
|
@app.route("/api/scan-for-acquisition/<acquisition_id>")
|
|
def scan_acquisition(acquisition_id: int):
|
|
conn = db.get()
|
|
calibration_id = session.get('calibration_id', None)
|
|
|
|
if calibration_id is None:
|
|
raise RuntimeError("Impossible de faire l'acquisition sans étalonnage")
|
|
|
|
acquisition = db.Acquisition.get_from_id(acquisition_id, conn)
|
|
|
|
if acquisition is None:
|
|
raise RuntimeError(f"Aucun acquisition d'id {acquisition_id}")
|
|
|
|
object = acquisition.object(conn)
|
|
|
|
def generate():
|
|
length = len(config.LEDS_UUIDS)
|
|
for index, led_uuid in enumerate(scanner.scan(join(config.OBJECT_DIR, str(object.id), str(acquisition.id)))):
|
|
yield f"{led_uuid},{(index+1)/length}\n"
|
|
|
|
return app.response_class(generate(), mimetype='text/plain')
|
|
|
|
|
|
@app.route("/validate-acquisition/<acquisition_id>")
|
|
def validate_acquisition(acquisition_id: int):
|
|
conn = db.get()
|
|
acquisition = db.Acquisition.get_from_id(acquisition_id, conn)
|
|
|
|
if acquisition is None:
|
|
raise f"Aucune acquisition d'id {acquisition_id}"
|
|
|
|
object = acquisition.object(conn)
|
|
|
|
acquisition.validated = True
|
|
with conn:
|
|
acquisition.save(conn)
|
|
|
|
return redirect(f'/object/{object.id}')
|
|
|
|
|
|
@app.route('/api/use-last-calibration')
|
|
def use_last_calibration():
|
|
conn = db.get()
|
|
calibration = db.Calibration.get_last(conn)
|
|
session['calibration_id'] = calibration.id
|
|
return 'ok'
|
|
|
|
|
|
@app.route("/api/calibrate")
|
|
def run_calibration():
|
|
conn = db.get()
|
|
id = session['calibration_id']
|
|
calib = db.Calibration.get_from_id(id, conn)
|
|
if calib is None:
|
|
return 'oops', 404
|
|
|
|
calibration_json = calibration.calibrate(join(config.CALIBRATION_DIR, str(id)))
|
|
with open(join(config.CALIBRATION_DIR, str(id), 'calibration.json'), 'w') as f:
|
|
json.dump(calibration_json, f, indent=4)
|
|
with conn:
|
|
calib.state = db.CalibrationState.IsComputed
|
|
calib.save(conn)
|
|
|
|
return 'ok'
|
|
|
|
|
|
@app.route('/validate-calibration')
|
|
def validate_calibration():
|
|
conn = db.get()
|
|
calib = get_calibration(conn)
|
|
if calib is None:
|
|
return 'oops', 404
|
|
|
|
with conn:
|
|
calib.validate(conn)
|
|
|
|
return redirect('/')
|
|
|
|
|
|
@app.route('/download-object/<id>')
|
|
def download_object(id: int):
|
|
conn = db.get()
|
|
object = db.Object.get_from_id(id, conn).full(conn)
|
|
|
|
# Group acquisitions sharing calibration
|
|
|
|
def keyfunc(x: db.Calibration) -> int:
|
|
return x.calibration_id
|
|
|
|
acquisitions_sorted = sorted(object.acquisitions, key=keyfunc)
|
|
acquisitions_grouped = [(db.Calibration.get_from_id(k, conn), list(g)) for k, g in itertools.groupby(acquisitions_sorted, key=keyfunc)]
|
|
|
|
def generate():
|
|
for calibration_index, (calib, acquisitions) in enumerate(acquisitions_grouped):
|
|
# Send each image
|
|
calibration_dir = join(config.CALIBRATION_DIR, str(calib.id))
|
|
for image in os.listdir(calibration_dir):
|
|
|
|
# Generate tar header for file
|
|
image_path = join(calibration_dir, image)
|
|
bytes = io.BytesIO()
|
|
stat = os.stat(image_path)
|
|
|
|
# Create dummy tar to extract tar header for file
|
|
with tarfile.open(fileobj=bytes, mode='w') as buffer:
|
|
tar_info = tarfile.TarInfo(image_path)
|
|
tar_info.name = f'object/{calibration_index}/calibration/{image}'
|
|
tar_info.size = stat.st_size
|
|
buffer.addfile(tar_info)
|
|
|
|
# Yield header
|
|
value = bytes.getvalue()
|
|
yield value[:512]
|
|
|
|
# Yield file content, by chunks of 4MiB
|
|
chunk_size = 4_194_304
|
|
bytes_len = 0
|
|
|
|
with open(image_path, 'rb') as file:
|
|
while True:
|
|
bytes = file.read(chunk_size)
|
|
|
|
if len(bytes) == 0:
|
|
break
|
|
|
|
bytes_len += len(bytes)
|
|
yield bytes
|
|
|
|
yield b'\x00' * (512 - bytes_len % 512)
|
|
|
|
for acquisition_index, acquisition in enumerate(acquisitions):
|
|
acquisition_dir = join(config.OBJECT_DIR, str(object.id), str(acquisition.id))
|
|
|
|
# Send each image
|
|
for image in os.listdir(acquisition_dir):
|
|
|
|
# Generate tar header for file
|
|
image_path = join(acquisition_dir, image)
|
|
bytes = io.BytesIO()
|
|
stat = os.stat(image_path)
|
|
|
|
# Create dummy tar to extract tar header for file
|
|
with tarfile.open(fileobj=bytes, mode='w') as buffer:
|
|
tar_info = tarfile.TarInfo(image_path)
|
|
tar_info.name = f'object/{calibration_index}/{acquisition_index}/{image}'
|
|
tar_info.size = stat.st_size
|
|
buffer.addfile(tar_info)
|
|
|
|
# Yield header
|
|
value = bytes.getvalue()
|
|
yield value[:512]
|
|
|
|
# Yield file content, by chunks of 4MiB
|
|
chunk_size = 4_194_304
|
|
bytes_len = 0
|
|
|
|
with open(image_path, 'rb') as file:
|
|
while True:
|
|
bytes = file.read(chunk_size)
|
|
|
|
if len(bytes) == 0:
|
|
break
|
|
|
|
bytes_len += len(bytes)
|
|
yield bytes
|
|
|
|
yield b'\x00' * (512 - bytes_len % 512)
|
|
|
|
return app.response_class(
|
|
generate(),
|
|
mimetype='application/x-tar',
|
|
headers={'Content-Disposition': 'attachment; filename="archive.tar"'}
|
|
)
|
|
|
|
|
|
@app.route('/static/<path:path>')
|
|
def send_static(path):
|
|
return send_from_directory('static', path)
|
|
|
|
|
|
@app.route('/data/<path:path>')
|
|
def send_data(path):
|
|
return send_from_directory('data', path)
|