This commit is contained in:
Thomas Forgione 2024-08-23 10:17:44 +02:00
parent 38a92d4a73
commit d3f1253376
3 changed files with 437 additions and 337 deletions

View File

@ -0,0 +1,287 @@
from flask import Flask, redirect, request, render_template, send_from_directory, session
import json
import os
from os.path import join
import sqlite3
from . import db, config, scanner, calibration, archive
app = Flask(__name__)
# Manage secret key
try:
from . import secret
app.config['SECRET_KEY'] = secret.SECRET_KEY
except ImportError:
# Secret key file does not exist, create it
secret = os.urandom(50).hex()
with open('secret.py', 'w') as f:
f.write(f'SECRET_KEY = "{secret}"')
app.config['SECRET_KEY'] = secret
def get_calibration(conn: sqlite3.Connection) -> db.Calibration:
calibration_id = session.get('calibration_id', None)
if calibration_id is None:
return db.Calibration.Dummy
return db.Calibration.get_from_id(calibration_id, conn)
@app.context_processor
def inject_stage_and_region():
conn = db.get()
return dict(calibration=get_calibration(conn), leds=config.LEDS_UUIDS, CalibrationState=db.CalibrationState)
@app.before_request
def manage_auto_use_last_calibration():
if config.AUTO_USE_LAST_CALIBRATION and 'calibration_id' not in session:
conn = db.get()
last = db.Calibration.get_last(conn)
if last is not None:
session['calibration_id'] = last.id
@app.route("/")
def index():
conn = db.get()
projects = db.Object.all_by_project(conn)
return render_template('index.html', projects=projects)
@app.route("/create-object/", methods=["POST"])
def create_object():
conn = db.get()
with conn:
db.Object.create(request.form.get('name'), request.form.get('project'), conn)
return redirect('/')
@app.route('/object/<id>')
def object(id: int):
conn = db.get()
object = db.Object.get_from_id(id, conn).full(conn)
return render_template('object.html', object=object)
@app.route('/delete-object/<id>')
def delete_object(id: int):
conn = db.get()
with conn:
db.Object.delete_from_id(id, conn)
return redirect('/')
@app.route('/scan/<id>')
def scan(id: int):
conn = db.get()
calibration_id = session.get('calibration_id', None)
object = db.Object.get_from_id(id, conn)
if calibration_id is None:
raise RuntimeError("Impossible de faire l'acquisition sans étalonnage")
return render_template('scan.html', object=object, calibrated=True)
@app.route('/scan-acquisition/<id>')
def scan_existing(id: int):
conn = db.get()
calibrated = session.get('calibration_id', None) is not None
acquisition = db.Acquisition.get_from_id(id, conn)
object = acquisition.object(conn)
return render_template('scan.html', object=object, acquisition=acquisition, calibrated=calibrated)
@app.route("/calibrate/")
def calibrate():
conn = db.get()
if 'calibration_id' not in session:
with conn:
calibration = db.Calibration.create(conn)
session['calibration_id'] = calibration.id
else:
calibration = db.Calibration.get_from_id(session['calibration_id'], conn)
if calibration.state in [db.CalibrationState.Empty, db.CalibrationState.HasData]:
return render_template('calibrate.html')
else:
return render_template('calibration.html', calibration=calibration)
@app.route("/new-calibration")
def new_calibration():
conn = db.get()
with conn:
calibration = db.Calibration.create(conn)
session['calibration_id'] = calibration.id
return redirect('/calibrate')
@app.route("/cancel-calibration")
def cancel_calibration():
conn = db.get()
calibration = db.Calibration.get_from_id(session['calibration_id'], conn)
calibration.state = db.CalibrationState.HasData
with conn:
calibration.save(conn)
return redirect('/calibrate')
@app.route("/api/scan-for-calibration")
def scan_calibration():
conn = db.get()
if 'calibration_id' not in session:
with conn:
calibration = db.Calibration.create(conn)
calibration_id = str(calibration.id)
session['calibration_id'] = calibration.id
else:
calibration_id = str(session['calibration_id'])
calibration = get_calibration(conn)
def generate():
length = len(config.LEDS_UUIDS)
for index, led_uuid in enumerate(scanner.scan(join(config.CALIBRATION_DIR, calibration_id))):
yield f"{led_uuid},{(index+1)/length}\n"
with conn:
calibration.state = db.CalibrationState.HasData
calibration.save(conn)
return app.response_class(generate(), mimetype='text/plain')
@app.route("/api/scan-for-object/<object_id>")
def scan_object(object_id: int):
conn = db.get()
calibration_id = session.get('calibration_id', None)
if calibration_id is None:
raise RuntimeError("Impossible de faire l'acquisition sans étalonnage")
object = db.Object.get_from_id(object_id, conn)
if object is None:
raise RuntimeError(f"Aucun objet d'id {object_id}")
with conn:
acquisition = object.add_acquisition(calibration_id, conn)
def generate():
yield str(acquisition.id)
length = len(config.LEDS_UUIDS)
for index, led_uuid in enumerate(scanner.scan(join(config.OBJECT_DIR, str(object.id), str(acquisition.id)))):
yield f"{led_uuid},{(index+1)/length}\n"
return app.response_class(generate(), mimetype='text/plain')
@app.route("/api/scan-for-acquisition/<acquisition_id>")
def scan_acquisition(acquisition_id: int):
conn = db.get()
calibration_id = session.get('calibration_id', None)
if calibration_id is None:
raise RuntimeError("Impossible de faire l'acquisition sans étalonnage")
acquisition = db.Acquisition.get_from_id(acquisition_id, conn)
if acquisition is None:
raise RuntimeError(f"Aucun acquisition d'id {acquisition_id}")
object = acquisition.object(conn)
def generate():
length = len(config.LEDS_UUIDS)
for index, led_uuid in enumerate(scanner.scan(join(config.OBJECT_DIR, str(object.id), str(acquisition.id)))):
yield f"{led_uuid},{(index+1)/length}\n"
return app.response_class(generate(), mimetype='text/plain')
@app.route("/validate-acquisition/<acquisition_id>")
def validate_acquisition(acquisition_id: int):
conn = db.get()
acquisition = db.Acquisition.get_from_id(acquisition_id, conn)
if acquisition is None:
raise f"Aucune acquisition d'id {acquisition_id}"
object = acquisition.object(conn)
acquisition.validated = True
with conn:
acquisition.save(conn)
return redirect(f'/object/{object.id}')
@app.route("/delete-acquisition/<acquisition_id>")
def delete_acquisition(acquisition_id: int):
conn = db.get()
with conn:
acqusition = db.Acquisition.delete_from_id(acquisition_id, conn)
return redirect('/object/' + str(acqusition.object_id))
@app.route('/use-last-calibration')
def use_last_calibration():
conn = db.get()
calibration = db.Calibration.get_last(conn)
session['calibration_id'] = calibration.id
return redirect('/calibrate')
@app.route('/api/use-last-calibration')
def api_use_last_calibration():
conn = db.get()
calibration = db.Calibration.get_last(conn)
session['calibration_id'] = calibration.id
return 'ok'
@app.route("/api/calibrate")
def run_calibration():
conn = db.get()
id = session['calibration_id']
calib = db.Calibration.get_from_id(id, conn)
if calib is None:
return 'oops', 404
calibration_json = calibration.calibrate(join(config.CALIBRATION_DIR, str(id)))
with open(join(config.CALIBRATION_DIR, str(id), 'calibration.json'), 'w') as f:
json.dump(calibration_json, f, indent=4)
with conn:
calib.state = db.CalibrationState.IsComputed
calib.save(conn)
return 'ok'
@app.route('/validate-calibration')
def validate_calibration():
conn = db.get()
calib = get_calibration(conn)
if calib is None:
return 'oops', 404
with conn:
calib.validate(conn)
return redirect('/')
@app.route('/static/<path:path>')
def send_static(path):
return send_from_directory('static', path)
@app.route('/data/<path:path>')
def send_data(path):
return send_from_directory('data', path)
app.register_blueprint(archive.bp)

335
app.py
View File

@ -1,335 +0,0 @@
#!/usr/bin/env python
from flask import Flask, redirect, request, render_template, send_from_directory, session
import itertools
import json
import os
from os.path import join
import sqlite3
from . import db, config, scanner, calibration, archive
app = Flask(__name__)
# Manage secret key
try:
from . import secret
app.config['SECRET_KEY'] = secret.SECRET_KEY
except ImportError:
# Secret key file does not exist, create it
secret = os.urandom(50).hex()
with open('secret.py', 'w') as f:
f.write(f'SECRET_KEY = "{secret}"')
app.config['SECRET_KEY'] = secret
def get_calibration(conn: sqlite3.Connection) -> db.Calibration:
calibration_id = session.get('calibration_id', None)
if calibration_id is None:
return db.Calibration.Dummy
return db.Calibration.get_from_id(calibration_id, conn)
@app.context_processor
def inject_stage_and_region():
conn = db.get()
return dict(calibration=get_calibration(conn), leds=config.LEDS_UUIDS, CalibrationState=db.CalibrationState)
@app.before_request
def manage_auto_use_last_calibration():
if config.AUTO_USE_LAST_CALIBRATION and 'calibration_id' not in session:
conn = db.get()
last = db.Calibration.get_last(conn)
if last is not None:
session['calibration_id'] = last.id
@app.route("/")
def index():
conn = db.get()
projects = db.Object.all_by_project(conn)
return render_template('index.html', projects=projects)
@app.route("/create-object/", methods=["POST"])
def create_object():
conn = db.get()
with conn:
db.Object.create(request.form.get('name'), request.form.get('project'), conn)
return redirect('/')
@app.route('/object/<id>')
def object(id: int):
conn = db.get()
object = db.Object.get_from_id(id, conn).full(conn)
return render_template('object.html', object=object)
@app.route('/delete-object/<id>')
def delete_object(id: int):
conn = db.get()
with conn:
db.Object.delete_from_id(id, conn)
return redirect('/')
@app.route('/scan/<id>')
def scan(id: int):
conn = db.get()
calibration_id = session.get('calibration_id', None)
object = db.Object.get_from_id(id, conn)
if calibration_id is None:
raise RuntimeError("Impossible de faire l'acquisition sans étalonnage")
return render_template('scan.html', object=object, calibrated=True)
@app.route('/scan-acquisition/<id>')
def scan_existing(id: int):
conn = db.get()
calibrated = session.get('calibration_id', None) is not None
acquisition = db.Acquisition.get_from_id(id, conn)
object = acquisition.object(conn)
return render_template('scan.html', object=object, acquisition=acquisition, calibrated=calibrated)
@app.route("/calibrate/")
def calibrate():
conn = db.get()
if 'calibration_id' not in session:
with conn:
calibration = db.Calibration.create(conn)
session['calibration_id'] = calibration.id
else:
calibration = db.Calibration.get_from_id(session['calibration_id'], conn)
if calibration.state in [db.CalibrationState.Empty, db.CalibrationState.HasData]:
return render_template('calibrate.html')
else:
return render_template('calibration.html', calibration=calibration)
@app.route("/new-calibration")
def new_calibration():
conn = db.get()
with conn:
calibration = db.Calibration.create(conn)
session['calibration_id'] = calibration.id
return redirect('/calibrate')
@app.route("/cancel-calibration")
def cancel_calibration():
conn = db.get()
calibration = db.Calibration.get_from_id(session['calibration_id'], conn)
calibration.state = db.CalibrationState.HasData
with conn:
calibration.save(conn)
return redirect('/calibrate')
@app.route("/api/scan-for-calibration")
def scan_calibration():
conn = db.get()
if 'calibration_id' not in session:
with conn:
calibration = db.Calibration.create(conn)
calibration_id = str(calibration.id)
session['calibration_id'] = calibration.id
else:
calibration_id = str(session['calibration_id'])
calibration = get_calibration(conn)
def generate():
length = len(config.LEDS_UUIDS)
for index, led_uuid in enumerate(scanner.scan(join(config.CALIBRATION_DIR, calibration_id))):
yield f"{led_uuid},{(index+1)/length}\n"
with conn:
calibration.state = db.CalibrationState.HasData
calibration.save(conn)
return app.response_class(generate(), mimetype='text/plain')
@app.route("/api/scan-for-object/<object_id>")
def scan_object(object_id: int):
conn = db.get()
calibration_id = session.get('calibration_id', None)
if calibration_id is None:
raise RuntimeError("Impossible de faire l'acquisition sans étalonnage")
object = db.Object.get_from_id(object_id, conn)
if object is None:
raise RuntimeError(f"Aucun objet d'id {object_id}")
with conn:
acquisition = object.add_acquisition(calibration_id, conn)
def generate():
yield str(acquisition.id)
length = len(config.LEDS_UUIDS)
for index, led_uuid in enumerate(scanner.scan(join(config.OBJECT_DIR, str(object.id), str(acquisition.id)))):
yield f"{led_uuid},{(index+1)/length}\n"
return app.response_class(generate(), mimetype='text/plain')
@app.route("/api/scan-for-acquisition/<acquisition_id>")
def scan_acquisition(acquisition_id: int):
conn = db.get()
calibration_id = session.get('calibration_id', None)
if calibration_id is None:
raise RuntimeError("Impossible de faire l'acquisition sans étalonnage")
acquisition = db.Acquisition.get_from_id(acquisition_id, conn)
if acquisition is None:
raise RuntimeError(f"Aucun acquisition d'id {acquisition_id}")
object = acquisition.object(conn)
def generate():
length = len(config.LEDS_UUIDS)
for index, led_uuid in enumerate(scanner.scan(join(config.OBJECT_DIR, str(object.id), str(acquisition.id)))):
yield f"{led_uuid},{(index+1)/length}\n"
return app.response_class(generate(), mimetype='text/plain')
@app.route("/validate-acquisition/<acquisition_id>")
def validate_acquisition(acquisition_id: int):
conn = db.get()
acquisition = db.Acquisition.get_from_id(acquisition_id, conn)
if acquisition is None:
raise f"Aucune acquisition d'id {acquisition_id}"
object = acquisition.object(conn)
acquisition.validated = True
with conn:
acquisition.save(conn)
return redirect(f'/object/{object.id}')
@app.route("/delete-acquisition/<acquisition_id>")
def delete_acquisition(acquisition_id: int):
conn = db.get()
with conn:
acqusition = db.Acquisition.delete_from_id(acquisition_id, conn)
return redirect('/object/' + str(acqusition.object_id))
@app.route('/use-last-calibration')
def use_last_calibration():
conn = db.get()
calibration = db.Calibration.get_last(conn)
session['calibration_id'] = calibration.id
return redirect('/calibrate')
@app.route('/api/use-last-calibration')
def api_use_last_calibration():
conn = db.get()
calibration = db.Calibration.get_last(conn)
session['calibration_id'] = calibration.id
return 'ok'
@app.route("/api/calibrate")
def run_calibration():
conn = db.get()
id = session['calibration_id']
calib = db.Calibration.get_from_id(id, conn)
if calib is None:
return 'oops', 404
calibration_json = calibration.calibrate(join(config.CALIBRATION_DIR, str(id)))
with open(join(config.CALIBRATION_DIR, str(id), 'calibration.json'), 'w') as f:
json.dump(calibration_json, f, indent=4)
with conn:
calib.state = db.CalibrationState.IsComputed
calib.save(conn)
return 'ok'
@app.route('/validate-calibration')
def validate_calibration():
conn = db.get()
calib = get_calibration(conn)
if calib is None:
return 'oops', 404
with conn:
calib.validate(conn)
return redirect('/')
def download_object(id: int, tar: archive.ArchiveSender):
conn = db.get()
object = db.Object.get_from_id(id, conn).full(conn)
# Group acquisitions sharing calibration
def keyfunc(x: db.Calibration) -> int:
return x.calibration_id
acquisitions_sorted = sorted(object.acquisitions, key=keyfunc)
acquisitions_grouped = [
(db.Calibration.get_from_id(k, conn), list(g))
for k, g in itertools.groupby(acquisitions_sorted, key=keyfunc)
]
# Create archive file to send
for calibration_index, (calib, acquisitions) in enumerate(acquisitions_grouped):
calibration_dir = join(config.CALIBRATION_DIR, str(calib.id))
# Add calibration images
for image in os.listdir(calibration_dir):
tar.add_file(
f'object/{calibration_index}/calibration/{image}',
join(calibration_dir, image)
)
# Add each acquisition
for acquisition_index, acquisition in enumerate(acquisitions):
acquisition_dir = join(config.OBJECT_DIR, str(object.id), str(acquisition.id))
for image in os.listdir(acquisition_dir):
tar.add_file(
f'object/{calibration_index}/{acquisition_index}/{image}',
join(acquisition_dir, image)
)
return tar.response()
@app.route('/download-object/tar/<id>')
def download_object_tar(id: int):
return download_object(id, archive.TarSender())
@app.route('/download-object/zip/<id>')
def download_object_zip(id: int):
return download_object(id, archive.ZipSender())
@app.route('/static/<path:path>')
def send_static(path):
return send_from_directory('static', path)
@app.route('/data/<path:path>')
def send_data(path):
return send_from_directory('data', path)

View File

@ -1,11 +1,14 @@
import builtins
from datetime import datetime
from flask import Response
from flask import Response, Blueprint
import functools
import itertools
import os
from os.path import join
import zlib
from typing import Optional
import time
from . import db, config
# Chunks for crc 32 computation
CRC32_CHUNK_SIZE = 65_536
@ -21,6 +24,13 @@ ZERO = ord('0')
def tar_header_chunk(filename: str, filepath: str) -> bytes:
"""
Returns the 512 bytes header for a tar file in a tar archive.
Args:
filename (str): path of where the file will be in the archive.
filepath (str): path of where the file is currently on the disk.
"""
# Returns the octal representation without the initial
def oct(i: int) -> str:
@ -67,25 +77,58 @@ def tar_header_chunk(filename: str, filepath: str) -> bytes:
class ArchiveSender:
"""
Helper class to send archives over the network.
This class is abstract, and needs to be derived by specific archive sender classes.
"""
def __init__(self):
"""
Creates a new archive sender.
"""
self.files: dict[str, str] = {}
def add_file(self, filename: str, filepath: str):
"""
Adds a file to the archive.
Args:
filename (str): path of where the file will be in the archive.
filepath (str): path of where the file is currently on the disk.
"""
self.files[filename] = filepath
def content_length(self) -> Optional[int]:
"""
Returns the size of the archive if it is computable beforehand, none otherwise.
"""
return None
def generator(self):
"""
Returns a generator that yields the bytes of the archive.
"""
raise NotImplementedError("Abstract method")
def mime_type(self) -> str:
"""
Returns the mime type of the archive.
"""
raise NotImplementedError("Abstract method")
def archive_name(self) -> str:
"""
Returns the name of the archive.
This method is useful for web applications where the archive will be downloaded.
"""
raise NotImplementedError("Abstract method")
def response(self):
def response(self) -> Response:
"""
Returns a flask reponse for the archive.
"""
headers = {'Content-Disposition': f'attachment; filename="{self.archive_name()}"'}
length = self.content_length()
@ -100,6 +143,10 @@ class ArchiveSender:
class TarSender(ArchiveSender):
"""
A sender for tar archives computed on the fly.
"""
def generator(self):
def generate():
for name, file in self.files.items():
@ -141,6 +188,12 @@ class TarSender(ArchiveSender):
def crc32(filename) -> int:
"""
Computes the CRC32 checksum for the file.
Args:
filename (str): path to the file of which the CRC32 needs to be computed.
"""
with open(filename, 'rb') as fh:
hash = 0
while True:
@ -152,6 +205,17 @@ def crc32(filename) -> int:
def zip_local_file_header(filename: str, filepath: str, crc: int) -> bytes:
"""
Generates the bytes for the local file header of the file.
Args:
filename (str): path of where the file will be in the archive.
filepath (str): path of where the file is currently on the disk.
crc (int):
the CRC 32 checksum of the file. It is not computed by this function because it is also required in the
central directory file header, so the user of this function should compute it beforehand, and reuse it later
to avoid computing it twice.
"""
buffer_size = 30 + len(filename)
buffer = bytearray(buffer_size)
stat = os.stat(filepath)
@ -192,6 +256,18 @@ def zip_local_file_header(filename: str, filepath: str, crc: int) -> bytes:
def zip_central_directory_file_header(filename: str, filepath: str, crc: int, offset: int) -> bytes:
"""
Generates the bytes for the central directory file header of the file.
Args:
filename (str): path of where the file will be in the archive.
filepath (str): path of where the file is currently on the disk.
crc (int):
the CRC 32 checksum of the file. It is not computed by this function because it is also required in the
local file header, so the user of this function should compute it beforehand, and reuse it later to avoid
computing it twice.
offset (int): number of bytes where the file starts.
"""
buffer_size = 46 + len(filename)
buffer = bytearray(buffer_size)
stat = os.stat(filepath)
@ -246,6 +322,14 @@ def zip_central_directory_file_header(filename: str, filepath: str, crc: int, of
def zip_end_of_central_directory(items_number: int, central_directory_size: int, central_directory_offset: int):
"""
Generates the bytes for the end of central directory of the archive.
Args:
items_number (int): number of files in the archive.
central_directory_size (int): size in bytes of the central directory.
central_directory_offset (int): number of the byte where the central directory starts.
"""
buffer = bytearray(22)
# Field 1: End of central directory signature = 0x06054b50 (buffer[0:4])
buffer[0:4] = b'\x50\x4b\x05\x06'
@ -273,6 +357,10 @@ def zip_end_of_central_directory(items_number: int, central_directory_size: int,
class ZipSender(ArchiveSender):
"""
A sender for zip archives computed on the fly.
"""
def generator(self):
def generate():
local_offsets = dict()
@ -329,3 +417,63 @@ class ZipSender(ArchiveSender):
def archive_name(self) -> str:
return 'archive.zip'
def download_object(id: int, archive: ArchiveSender):
"""
Helper for routes that send archives.
"""
conn = db.get()
object = db.Object.get_from_id(id, conn).full(conn)
# Group acquisitions sharing calibration
def keyfunc(x: db.Calibration) -> int:
return x.calibration_id
acquisitions_sorted = sorted(object.acquisitions, key=keyfunc)
acquisitions_grouped = [
(db.Calibration.get_from_id(k, conn), list(g))
for k, g in itertools.groupby(acquisitions_sorted, key=keyfunc)
]
# Create archive file to send
for calibration_index, (calib, acquisitions) in enumerate(acquisitions_grouped):
calibration_dir = join(config.CALIBRATION_DIR, str(calib.id))
# Add calibration images
for image in os.listdir(calibration_dir):
archive.add_file(
f'object/{calibration_index}/calibration/{image}',
join(calibration_dir, image)
)
# Add each acquisition
for acquisition_index, acquisition in enumerate(acquisitions):
acquisition_dir = join(config.OBJECT_DIR, str(object.id), str(acquisition.id))
for image in os.listdir(acquisition_dir):
archive.add_file(
f'object/{calibration_index}/{acquisition_index}/{image}',
join(acquisition_dir, image)
)
return archive.response()
blueprint = Blueprint('archive', __name__)
@blueprint.route('/download-object/tar/<id>')
def download_object_tar(id: int):
"""
Downloads an object as a tar archive.
"""
return download_object(id, TarSender())
@blueprint.route('/download-object/zip/<id>')
def download_object_zip(id: int):
"""
Downloads an object as a zip archive.
"""
return download_object(id, ZipSender())