commit 11b08a9115019c3f4ec65a98a2ddb75fea93c089 Author: Nicolas Bertrand Date: Mon Jan 22 12:54:56 2024 +0100 Initial version diff --git a/check-dcp-long b/check-dcp-long new file mode 100755 index 0000000..2d3cbbe --- /dev/null +++ b/check-dcp-long @@ -0,0 +1,584 @@ +#!/bin/python3 + +import sys +import os.path +import argparse +import base64 +import logging as logger + +from lxml import etree +from urllib.parse import urlparse +import hashlib +from functools import partial + +class TdcpbException(Exception): + def __init__(self, message, errors = None): + Exception.__init__(self, message) + + + +def list_all_files(p_path): + fileList = [] + fileSize = 0 + folderCount = 0 + for root, subFolders, files in os.walk(p_path): + folderCount += len(subFolders) + # add empty folder to fileList + for _folder in subFolders: + _f = os.path.join(root,_folder) + if not os.listdir(_f): + fileList.append(_f) + # add files + for file in files: + f = os.path.join(root,file) + fileSize = fileSize + os.path.getsize(f) + fileList.append(f) + logger.debug("Total Size is {0} bytes".format(fileSize)) + logger.debug("Total Files: {} ".format(len(fileList))) + logger.debug("Total Folders: {}".format(folderCount)) + # return relative path + fileList = [ _w.split(p_path)[1] for _w in fileList] + # remove leading "/" + fileList = [ _w[1:] for _w in fileList] + return sorted(fileList) + +def URItoPath(p_path): + _parsed = urlparse(p_path.strip()) + _abs_path = ''.join([_parsed.netloc, _parsed.path]) + if _abs_path.startswith("/"): + #remove leading "/" + _abs_path = _abs_path[1:] + return _abs_path + +class DiError(TdcpbException): + def __init__(self,value): + self.value= value + def __str__(self): + return repr(self.value) + +class AssetmapParser(object): + def __init__(self, p_xml_path): + self.p_xml_path = p_xml_path + try: + self.tree = etree.parse(self.p_xml_path) + except IOError as msg: + _msg="Parser Error in file {}: {}".format(self.p_xml_path, msg) + raise DiError(_msg) + except etree.XMLSyntaxError as _msg: + _err = "File {}: {}".format(self.p_xml_path, _msg) + raise DiError(_err) + + self.root = self.tree.getroot() + self._GetNamespaces() + self.assets = {} + self._Verify() + + def Dump(self): + pass + + def GetAllAssets(self) : + _assets = self.tree.findall('./am:AssetList/am:Asset', namespaces = self.ns) + for _asset in _assets: + _id = _asset.find('am:Id', namespaces = self.ns) + _path = _asset.find("./am:ChunkList/am:Chunk/am:Path", namespaces = self.ns) + self.assets[_id.text] = URItoPath(_path.text) + return self.assets + + def GetPkls(self) : + self.pkls = [] + _assets = self.tree.findall('./am:AssetList/am:Asset', namespaces = self.ns) + for _asset in _assets: + _pkl = _asset.find('am:PackingList', namespaces = self.ns) + if (_pkl is not None): + if (_pkl.text != "false"): + self.pkls.append(_asset.find('am:Id', namespaces = self.ns).text) + return self.pkls + + def _GetNamespaces(self): + """ Get Namespaces """ + _ns = self.tree.getroot().tag[1:].split("}")[0] + self.ns = {'am': _ns} + logger.debug("Namespace is {}".format(self.ns)) + + + def _FindOne(self, p_tag, p_elem = None) : + + if p_elem is not None : + _elem = p_elem.findall(p_tag, namespaces = self.ns) + else : + _elem = self.tree.findall(p_tag, namespaces = self.ns) + if len( _elem) != 1: + _emsg = "Find tag {} {} times. Exepcted only one.".format(p_tag, + len(_elem)) + raise DiError(_emsg) + return _elem[0] + + def _Verify(self): + for _tag in self.root.findall('am:VolumeCount', namespaces = self.ns): + _count = int(_tag.text) + if _count > 1: + _emsg = "The tool does not support more the one VOLINDEX" + raise DiError(_emsg) + + _tag = './am:AssetList' + _asset_list = self._FindOne(_tag) + _tag = './am:Asset' + _assets =_asset_list.findall(_tag, namespaces = self.ns) + if not _assets: + _emsg = "No Asset found. Invalid AssetMap" + raise DiError(_emsg) + + _pkl_count = 0 + for _asset in _assets: + # check Id presence + _tag = './am:Id' + _id = self._FindOne(_tag, _asset) + _tag = './am:PackingList' + _pkl =_asset.find(_tag, namespaces = self.ns) + if (_pkl is not None): + if (_pkl.text != "false"): + _pkl_count += 1 + _tag = './am:ChunkList' + _chunk_list = self._FindOne(_tag, _asset) + _tag = './am:Chunk' + _chunk = _chunk_list.findall(_tag, namespaces = self.ns) + if not _chunk : + _emsg =" No Chunk found" + raise DiError(_emsg) + if len(_chunk) > 1 : + _emsg = "Tool doesn't handle segmentation (i.e. several vol index)" + raise DiError(_emsg) + _tag = './am:Path' + _path = self._FindOne(_tag, _chunk[0]) + if _pkl_count == 0 : + _emsg = "No PKL found." + raise DiError(_emsg) + + +class PklParser(AssetmapParser): + + def __init__(self,p_xml_path, pkl_urn_id): + self.pkl_data={} + self.pkl_urn_id = pkl_urn_id + AssetmapParser.__init__(self,p_xml_path) + + def _GetNamespaces(self): + """ Get Namespaces """ + _ns = self.tree.getroot().tag[1:].split("}")[0] + self.ns = {'pkl': _ns} + + def GetAssets(self): + _assets = self.tree.findall('./pkl:AssetList/pkl:Asset', namespaces = self.ns) + for _asset in _assets: + _asset_dict={} + # parse mandatory tags + _tag = 'Id' + _id = _asset.find('pkl:{}'.format(_tag), namespaces = self.ns) + + _tag = 'Hash' + _elem = _asset.find('pkl:{}'.format(_tag), namespaces = self.ns) + _asset_dict[_tag] = _elem.text + + _tag = 'Size' + _elem = _asset.find('pkl:{}'.format(_tag), namespaces = self.ns) + _asset_dict[_tag] = _elem.text + + _tag = 'Type' + _elem = _asset.find('pkl:{}'.format(_tag), namespaces = self.ns) + _asset_dict[_tag] = _elem.text + + # parse optional tags + _tag = 'OriginalFileName' + _elem = _asset.find('pkl:{}'.format(_tag), namespaces = self.ns) + if _elem is not None: + _asset_dict[_tag] = _elem.text + + _tag = 'AnnotationText' + _elem = _asset.find('pkl:{}'.format(_tag), namespaces = self.ns) + if _elem is not None: + _asset_dict[_tag] = _elem.text + self.assets[_id.text] = _asset_dict + return self.assets + + def DumpPkl(self): + if self.pkl_data is None: + return + _pkl_str="Packing List data\n" + _tag = "Id" + _pkl_str += "{:<30}: {}\n".format(_tag, self.pkl_data[_tag]) + _tag = "IssueDate" + _pkl_str += "{:<30}: {}\n".format(_tag, self.pkl_data[_tag]) + _tag = "Issuer" + _pkl_str += "{:<30}: {}\n".format(_tag, self.pkl_data[_tag]) + _tag = "Creator" + _pkl_str += "{:<30}: {}\n".format(_tag, self.pkl_data[_tag]) + print(_pkl_str) + + def _Verify(self): + _tag = 'Id' + self.pkl_data[_tag] = self._FindOne('./pkl:{}'.format(_tag)).text + if self.pkl_data[_tag] != self.pkl_urn_id: + _msg = "Id of PKL did not match one in AssetMap" + logger.error("ID in PKL: {} ID of PKL in AssetMap: {}".format( + self.pkl_data[_tag], self.pkl_urn_id)) + raise DiError(_msg) + _tag = 'IssueDate' + self.pkl_data[_tag] = self._FindOne('./pkl:{}'.format(_tag)).text + _tag = 'Issuer' + self.pkl_data[_tag] = self._FindOne('./pkl:{}'.format(_tag)).text + _tag = 'Creator' + self.pkl_data[_tag] = self._FindOne('./pkl:{}'.format(_tag)).text + + _tag = 'AssetList' + self._FindOne('./pkl:{}'.format(_tag)) + +class DiParser(object): + + + def __init__(self, p_dcp_folder): + if os.path.exists(p_dcp_folder): + self.p_dcp_folder = p_dcp_folder + else: + _emsg = "Not a DCP folder: {}".format(p_dcp_folder) + raise DiError(_emsg) + self.volindex = "" + + def list_unexpected_files(self): + if not hasattr(self, '_unexpected_files'): + self.check_files(strict=False) + + return self._unexpected_files + + def list_dcp_files(self): + try: + self.getAssetmap() + self._assetmap_xml = AssetmapParser(self.assetmap_path) + except DiError as msg: + logger.error(msg) + return 0 + self.am_assets = self._assetmap_xml.GetAllAssets() + _dcp_files = list(self.am_assets.values()) + # insert assetmap file + _dcp_files.append(os.path.basename(self.assetmap_path)) + # insert VOLINDEX as it is not listed in AssetMap + # TODO Manage case of several VOLINDEX ie with or without .xml + _dcp_files.append(self.volindex) + return sorted(_dcp_files) + + def check_files(self, strict=True): + _nb_assets = 0 + # 1st check presence of dummy VOLINDEX + if not self.isVolindexPresent(): + _msg = "No VOLINDEX found in DCP folder({}) ".format( + self.p_dcp_folder) + logger.error(_msg) + return 0 + try: + self.getAssetmap() + self._assetmap_xml = AssetmapParser(self.assetmap_path) + except DiError as msg: + logger.error(msg) + return 0 + self.am_assets = self._assetmap_xml.GetAllAssets() + logger.debug("Found {} assets".format(len(self.am_assets))) + self.pkls = self._assetmap_xml.GetPkls() + logger.debug("Found {} PKLS".format(len(self.pkls))) + if len(self.pkls) == 0: + _msg = "No PKL found. Bad DCP" + logger.error(_msg) + return 0 + for _pkl in self.pkls : + _pkl_urn_id = _pkl + _pkl_path= os.path.join(self.p_dcp_folder, self.am_assets[_pkl_urn_id]) + logger.debug("Parsing PKL: {}".format(os.path.basename(_pkl_path))) + try: + _pkl_xml = PklParser(_pkl_path, _pkl_urn_id) + except DiError as msg: + logger.error(msg) + return 0 + # Valid pkl file increasse asset counter + _nb_assets +=1 + _msg = "Found : {} ".format(self.am_assets[_pkl_urn_id]) + logger.debug(_msg) + + _pkl_assets = _pkl_xml.GetAssets() + try: + _nb_assets += self._ExistsAssets(_pkl_assets) + except DiError as _msg: + logger.error(_msg) + return 0 + if (_nb_assets != len (self.am_assets)): + _msg = "Invalid number of assets,( {} in AssetMap, {} counted)"\ + .format(len(self.am_assets), _nb_assets) + + # check presence of uneeded files + _dcp_files = self.list_dcp_files() + _dir_files = list_all_files(self.p_dcp_folder) + + self._unexpected_files = [] + if len(list(set(_dir_files) - set(_dcp_files))) > 0: + _msg = "Unexpected files or dir present in DCP folder {}".format(self.p_dcp_folder) + if (strict): + logger.error(_msg) + logger.error("Unexpected files or dir : ") + else: + logger.warning('Errors transformed in warning (relaxed check):') + logger.warning(_msg) + logger.warning("Unexpected files or dir : ") + _unexpected = list(set(_dir_files) - set(_dcp_files)) + self._unexpected_files = _unexpected + for _f in _unexpected: + if (strict): + logger.error("- {}".format(_f)) + else: + logger.warning("- {}".format(_f)) + + _msg = "Files in directory = {} Expected files in DCP = {}".format(len(_dir_files), len(_dcp_files)) + if (strict): + logger.error(_msg) + return 0 + else: + logger.warning(_msg) + + if len(list(set(_dcp_files) - set(_dir_files))) > 0: + # This should likely not happen as file existence got checked before. + _msg="DCP {} contains less file then expected : files in dir = {} expected files ={} "\ + .format(self.p_dcp_folder, len(_dir_files), len(_dcp_files) ) + logger.error(_msg) + return 0 + else : + logger.debug("files in DCP and files in Assetmap are coherent") + + return _nb_assets + + def check_hash(self): + _nb_assets = 0 + # 1st check presence of dummy VOLINDEX + if not self.isVolindexPresent(): + _msg = "No VOLINDEX found in DCP folder({}) ".format( + self.p_dcp_folder) + logger.error(_msg) + return "KO" + try: + self.getAssetmap() + self._assetmap_xml = AssetmapParser(self.assetmap_path) + except DiError as msg: + logger.error(msg) + return "KO" + self.am_assets = self._assetmap_xml.GetAllAssets() + logger.debug("Found {} assets".format(len(self.am_assets))) + self.pkls = self._assetmap_xml.GetPkls() + if len(self.pkls) == 0: + _msg = "No PKL found. Bad DCP" + logger.error(_msg) + return "KO" + for _pkl in self.pkls : + _pkl_urn_id = _pkl + _pkl_path= os.path.join(self.p_dcp_folder, self.am_assets[_pkl_urn_id]) + logger.debug("Parsing PKL: {}".format(os.path.basename(_pkl_path))) + try: + _pkl_xml = PklParser(_pkl_path, _pkl_urn_id) + except DiError as msg: + logger.error(msg) + return "KO" + # Valid pkl file increasse asset counter + _nb_assets +=1 + _msg = "Found : {} ".format(self.am_assets[_pkl_urn_id]) + logger.debug(_msg) + + _pkl_assets = _pkl_xml.GetAssets() + try: + self._ExistsAssets(_pkl_assets) + self._VerifyHash() + except DiError as _msg: + logger.error(_msg) + return "KO" + return "OK" + + + def Ingest(self): + self.getAssetmap() + self._assetmap_xml = AssetmapParser(self.assetmap_path) + self.am_assets = self._assetmap_xml.GetAllAssets() + self.pkls = self._assetmap_xml.GetPkls() + for _pkl in self.pkls: + _pkl_path= os.path.join(self.p_dcp_folder, self.am_assets[_pkl]) + _pkl_xml = PklParser(_pkl_path) + _pkl_assets = _pkl_xml.GetAssets() + _pkl_xml.DumpPkl() + self._VerifyAssets(_pkl_assets) + + def getAssetmap(self) : + _assetmap = os.path.join(self.p_dcp_folder, "ASSETMAP") + if os.path.isfile(_assetmap): + logger.debug("The DCP is in interop format") + self.assetmap_path = _assetmap + else: + _assetmap = os.path.join(self.p_dcp_folder, "ASSETMAP.xml") + if os.path.isfile(_assetmap): + logger.debug("The DCP is in SMPTE format") + self.assetmap_path = _assetmap + else: + _emsg="No ASSETMAP file found" + raise DiError(_emsg) + return + + def isAssetmap(self, p_file): + if (p_file == "ASSETMAP") or (p_file == "ASSETMAP.xml"): + return True + return False + + def isVolindexPresent(self): + if os.path.exists(os.path.join(self.p_dcp_folder, "VOLINDEX")): + self.volindex = "VOLINDEX" + return True + if os.path.exists(os.path.join(self.p_dcp_folder, "VOLINDEX.xml")) : + self.volindex = "VOLINDEX.xml" + return True + return False + + def _ExistsAssets(self, p_pkl_assets): + self.assets = {} + for _k,_v in p_pkl_assets.items(): + if _k in self.am_assets: + _asset =_v + _path = os.path.join(self.p_dcp_folder, self.am_assets[_k]) + if os.path.exists(_path): + _asset['Path'] = _path + _msg = "Found : {} ".format(_path[len(self.p_dcp_folder):]) + logger.debug(_msg) + else: + _msg = "Asset {} not in DCP directory".format( + _path[len(self.p_dcp_folder):]) + raise DiError(_msg) + if os.stat(_path).st_size != int(_v['Size']) : + _msg = "Asset {} has wrong size".format( + _path[len(self.p_dcp_folder):]) + logger.error("stat size = {}, Size in PKL = {}".format( + os.stat(_path).st_size, _v['Size'])) + raise DiError(_msg) + self.assets[_k] = _asset + else: + _msg = "Asset with id {} not found in assetmap".format(_k) + raise DiError(_msg) + return len(self.assets) + + def _VerifyHash(self): + for _k, _v in self.assets.items(): + _msg = "Checking hash of {}".format(_v['Path']) + logger.debug(_msg) + _sum = self._HashSum(_v['Path']) + if _sum == _v['Hash']: + _msg = " Hash verification OK ({})".format(_v['Path']) + logger.debug(_msg) + else: + _msg = " Hash verification failed for file {} \ +CALC SUM = {}\n EXPT SUM = {} ".format( _v['Path'], _sum, _v['Hash'] ) + logger.error(_msg) + raise DiError(_msg) + + def _HashSum(self, p_filepath): + """ check SHA1 of DCP files. + As defined in SMPTE 429-8-2007 section 6.3""" + _sha1 = hashlib.sha1() + _f = open(p_filepath, 'rb') + try: + for _buff in iter(partial(_f.read, 10 * 1024**2), b''): + _sha1.update(_buff) + finally: + _f.close() + return base64.b64encode(_sha1.digest()).decode('ASCII') + + +def tdcpb_check_files(p_dcp_folder, strict=True): + ''' + Check the contents of a DCP folder. + If strict is False, the check will succeed when errors are fixable (e.g. + presence of unexpected files). + ''' + _dcp_folder = os.path.abspath(p_dcp_folder) + logger.info('File check started for {}'\ + .format(os.path.basename(_dcp_folder))) + # do some basic check + if not os.path.exists(_dcp_folder): + _msg = "dcp directory {} does not exist"\ + .format(_dcp_folder) + raise TdcpbException(_msg) + #TODO : why not use normpath ? + try : + DCP = DiParser(_dcp_folder) + _nb = DCP.check_files(strict=strict) + except DiError as _err: + raise TdcpbException(_err) + if _nb == 0: + _err = "DCP {} not well formed "\ + .format(os.path.basename(_dcp_folder)) + raise TdcpbException(_err) + logger.info('File check OK for {}'\ + .format(os.path.basename(_dcp_folder))) + +def tdcpb_check_hash(p_dcp_folder): + ''' + Check integrity (checksum verification) of a DCP. + This usually takes a long time. + ''' + logger.info("Hash Check started for {}"\ + .format(os.path.basename(p_dcp_folder))) + # do some basic check + if not os.path.exists(p_dcp_folder): + _msg = "dcp directory {} does not exist"\ + .format(p_dcp_folder) + raise TdcpbException(_msg) + + _dcp_folder = os.path.abspath(p_dcp_folder) + print ("{}".format(_dcp_folder)) + try : + DCP = DiParser(_dcp_folder) + _res = DCP.check_hash() + except DiError as _err: + raise TdcpbException(_err) + if _res != 'OK': + _err = "DCP hash verfication failed" + print("Verfication DCP ERREUR : {}". \ + format(os.path.basename(_dcp_folder))) + raise TdcpbException(_err) + print("Verfication DCP OK : {}".format(os.path.basename(_dcp_folder))) + logger.info("Hash OK for {}". \ + format(os.path.basename(_dcp_folder))) + +def tdcpb_check(p_dcp_folder, p_check_type=u"short"): + + if (p_check_type == u"short"): + tdcpb_check_files(p_dcp_folder) + elif (p_check_type == u"long"): + tdcpb_check_hash(p_dcp_folder) + else: + _err = "unknow verfication type:{}".format(p_check_type) + logger.error(_err) + raise TdcpbException(_err) + +def main(argv): + parser = argparse.ArgumentParser(description='DCP intgrity(hash) verification') + parser.add_argument('dcp_path', + metavar='DCP_PATH', + type = str, + nargs = "?", + help = 'DCP path' ) + #parser.add_argument('-d', '--debug', dest='debug', action='store_const', + # const=logging.DEBUG, default=logging.INFO, + # help='debug mode') + + args = parser.parse_args() + + if not args.dcp_path: + logger.error("No DCP source") + return 1 + try: + tdcpb_check(args.dcp_path, u'long') + except TdcpbException as _err: + logger.error(_err) + return 1 + return 0 + +if __name__ == "__main__": + sys.exit(main(sys.argv))