import builtins from datetime import datetime from flask import Response import functools import os import zlib # Chunks for crc 32 computation CRC32_CHUNK_SIZE = 65_536 # 4MiB chunks CHUNK_SIZE = 4_194_304 # ASCII value for space SPACE = ord(' ') # ASCII value for zero ZERO = ord('0') def tar_header_chunk(filename: str, filepath: str) -> bytes: # Returns the octal representation without the initial def oct(i: int) -> str: return builtins.oct(i)[2:] stat = os.stat(filepath) buffer = bytearray(512) # Field 1: filename on 100 bytes buffer[0:len(filename)] = filename.encode('ascii') # Field 2: mode, on 8 bytes, octal, last byte must be \x00, so we set only the first 7 bytes buffer[100:107] = oct(stat.st_mode).rjust(7, '0').encode('ascii') # Field 3: owner, on 8 bytes, octal, last byte must be \x00, so we set only the first 7 bytes buffer[108:115] = oct(stat.st_uid).rjust(7, '0').encode('ascii') # Field 4: group, on 8 bytes, octal, last byte must be \x00, so we set only the first 7 bytes buffer[116:123] = oct(stat.st_gid).rjust(7, '0').encode('ascii') # Field 5: file size in bytes, on 12 bytes, octal, last byte must be \x00, so we set only the first 11 bytes buffer[124:135] = oct(stat.st_size).rjust(11, '0').encode('ascii') # Field 6: last modified, on 12 bytes, octal, last byte must be \x00, so we set only the first 11 bytes buffer[136:147] = oct(int(stat.st_mtime)).rjust(11, '0').encode('ascii') # Field 7: checksum, we fill it at the end # Field 8: type flag, 0 because we only have regular files buffer[156] = ZERO # Field 9: linkname, \x00s because we only have regular files # POSIX 1003.1-1990: 255 empty bytes # Compute the checksum: we start at 256 which are the 8 fields of checksum filled with spaces (32 * 8) checksum = oct(functools.reduce(lambda x, y: x + y, buffer, 256)).rjust(6, '0').encode('ascii') buffer[148:154] = checksum # Don't ask me why, but the checksum must end with b'\x00 ', so we skip the \x00 and write the space buffer[155] = SPACE return bytes(buffer) class ArchiveSender: def __init__(self): self.files: dict[str, str] = {} def add_file(self, filename: str, filepath: str): self.files[filename] = filepath def response(self): raise NotImplementedError("Abstract method") class TarSender(ArchiveSender): def response(self): def generate(): for name, file in self.files.items(): yield tar_header_chunk(name, file) bytes_sent = 0 with open(file, 'rb') as f: while True: bytes = f.read(CHUNK_SIZE) if len(bytes) == 0: break bytes_sent += len(bytes) yield bytes # Because tar use records of 512 bytes, we need to pad the # file with zeroes to fill the last chunk yield b'\x00' * (512 - bytes_sent % 512) return Response( generate(), mimetype='application/x-tar', headers={'Content-Disposition': 'attachment; filename="archive.tar"'} ) def crc32(filename): with open(filename, 'rb') as fh: hash = 0 while True: s = fh.read(CRC32_CHUNK_SIZE) if not s: break hash = zlib.crc32(s, hash) return hash def zip_local_file_header(filename: str, filepath: str, fileindex: int) -> bytes: buffer_size = 30 + len(filename) buffer = bytearray(buffer_size) stat = os.stat(filepath) # Field 1: local file header signature (buffer[0:4]) buffer[0:4] = b'PK\x03\x04' # Field 2: version needed to extract (minimum) (buffer[4:6]) buffer[4:6] = b'\x0a' # Field 3: general purpose bit flag (buffer[6:8]), leave at 0 # Field 4: compression mode (buffer[8:10]), leave at 0 (uncompressed) # Field 5: file last modification time (buffer[10:14]) mtime = datetime.fromtimestramp(stat.st_mtime) buffer[10:12] = (mtime.second // 2) | (mtime.minute << 5) | (mtime.hour << 11) buffer[12:14] = mtime.day | (mtime.month << 5) | ((mtime.year - 1980) << 9) # Field 6: crc-32 of uncompressed data (buffer[14:18]) buffer[14:18] = crc32(filepath).to_bytes(4) # Field 7: compressed size (buffer[18:22]) buffer[18:22] = stat.st_size.to_bytes(4) # Field 8: uncompressed size (buffer[22:26]) buffer[22:26] = stat.st_size.to_bytes(4) # Field 9: filename length (buffer[26:28]) buffer[26:29] = len(filename).to_bytes(4) # Field 10: extra field length (buffer[28:30]) # Field 11: filename (buffer[30:30+len(filename)]) buffer[30:30+len(filename)] = filename.encode('ascii') return buffer class ZipSender(ArchiveSender): def response(self): def generate(): yield 'oops' return Response( generate(), mimetype='application/zip', headers={'Content-Disposition': 'attachment; filename="archive.zip"'} )