#!/bin/python3 import sys import os.path import argparse import base64 import logging as logger from lxml import etree from urllib.parse import urlparse import hashlib from functools import partial class TdcpbException(Exception): def __init__(self, message, errors = None): Exception.__init__(self, message) def list_all_files(p_path): fileList = [] fileSize = 0 folderCount = 0 for root, subFolders, files in os.walk(p_path): folderCount += len(subFolders) # add empty folder to fileList for _folder in subFolders: _f = os.path.join(root,_folder) if not os.listdir(_f): fileList.append(_f) # add files for file in files: f = os.path.join(root,file) fileSize = fileSize + os.path.getsize(f) fileList.append(f) logger.debug("Total Size is {0} bytes".format(fileSize)) logger.debug("Total Files: {} ".format(len(fileList))) logger.debug("Total Folders: {}".format(folderCount)) # return relative path fileList = [ _w.split(p_path)[1] for _w in fileList] # remove leading "/" fileList = [ _w[1:] for _w in fileList] return sorted(fileList) def URItoPath(p_path): _parsed = urlparse(p_path.strip()) _abs_path = ''.join([_parsed.netloc, _parsed.path]) if _abs_path.startswith("/"): #remove leading "/" _abs_path = _abs_path[1:] return _abs_path class DiError(TdcpbException): def __init__(self,value): self.value= value def __str__(self): return repr(self.value) class AssetmapParser(object): def __init__(self, p_xml_path): self.p_xml_path = p_xml_path try: self.tree = etree.parse(self.p_xml_path) except IOError as msg: _msg="Parser Error in file {}: {}".format(self.p_xml_path, msg) raise DiError(_msg) except etree.XMLSyntaxError as _msg: _err = "File {}: {}".format(self.p_xml_path, _msg) raise DiError(_err) self.root = self.tree.getroot() self._GetNamespaces() self.assets = {} self._Verify() def Dump(self): pass def GetAllAssets(self) : _assets = self.tree.findall('./am:AssetList/am:Asset', namespaces = self.ns) for _asset in _assets: _id = _asset.find('am:Id', namespaces = self.ns) _path = _asset.find("./am:ChunkList/am:Chunk/am:Path", namespaces = self.ns) self.assets[_id.text] = URItoPath(_path.text) return self.assets def GetPkls(self) : self.pkls = [] _assets = self.tree.findall('./am:AssetList/am:Asset', namespaces = self.ns) for _asset in _assets: _pkl = _asset.find('am:PackingList', namespaces = self.ns) if (_pkl is not None): if (_pkl.text != "false"): self.pkls.append(_asset.find('am:Id', namespaces = self.ns).text) return self.pkls def _GetNamespaces(self): """ Get Namespaces """ _ns = self.tree.getroot().tag[1:].split("}")[0] self.ns = {'am': _ns} logger.debug("Namespace is {}".format(self.ns)) def _FindOne(self, p_tag, p_elem = None) : if p_elem is not None : _elem = p_elem.findall(p_tag, namespaces = self.ns) else : _elem = self.tree.findall(p_tag, namespaces = self.ns) if len( _elem) != 1: _emsg = "Find tag {} {} times. Exepcted only one.".format(p_tag, len(_elem)) raise DiError(_emsg) return _elem[0] def _Verify(self): for _tag in self.root.findall('am:VolumeCount', namespaces = self.ns): _count = int(_tag.text) if _count > 1: _emsg = "The tool does not support more the one VOLINDEX" raise DiError(_emsg) _tag = './am:AssetList' _asset_list = self._FindOne(_tag) _tag = './am:Asset' _assets =_asset_list.findall(_tag, namespaces = self.ns) if not _assets: _emsg = "No Asset found. Invalid AssetMap" raise DiError(_emsg) _pkl_count = 0 for _asset in _assets: # check Id presence _tag = './am:Id' _id = self._FindOne(_tag, _asset) _tag = './am:PackingList' _pkl =_asset.find(_tag, namespaces = self.ns) if (_pkl is not None): if (_pkl.text != "false"): _pkl_count += 1 _tag = './am:ChunkList' _chunk_list = self._FindOne(_tag, _asset) _tag = './am:Chunk' _chunk = _chunk_list.findall(_tag, namespaces = self.ns) if not _chunk : _emsg =" No Chunk found" raise DiError(_emsg) if len(_chunk) > 1 : _emsg = "Tool doesn't handle segmentation (i.e. several vol index)" raise DiError(_emsg) _tag = './am:Path' _path = self._FindOne(_tag, _chunk[0]) if _pkl_count == 0 : _emsg = "No PKL found." raise DiError(_emsg) class PklParser(AssetmapParser): def __init__(self,p_xml_path, pkl_urn_id): self.pkl_data={} self.pkl_urn_id = pkl_urn_id AssetmapParser.__init__(self,p_xml_path) def _GetNamespaces(self): """ Get Namespaces """ _ns = self.tree.getroot().tag[1:].split("}")[0] self.ns = {'pkl': _ns} def GetAssets(self): _assets = self.tree.findall('./pkl:AssetList/pkl:Asset', namespaces = self.ns) for _asset in _assets: _asset_dict={} # parse mandatory tags _tag = 'Id' _id = _asset.find('pkl:{}'.format(_tag), namespaces = self.ns) _tag = 'Hash' _elem = _asset.find('pkl:{}'.format(_tag), namespaces = self.ns) _asset_dict[_tag] = _elem.text _tag = 'Size' _elem = _asset.find('pkl:{}'.format(_tag), namespaces = self.ns) _asset_dict[_tag] = _elem.text _tag = 'Type' _elem = _asset.find('pkl:{}'.format(_tag), namespaces = self.ns) _asset_dict[_tag] = _elem.text # parse optional tags _tag = 'OriginalFileName' _elem = _asset.find('pkl:{}'.format(_tag), namespaces = self.ns) if _elem is not None: _asset_dict[_tag] = _elem.text _tag = 'AnnotationText' _elem = _asset.find('pkl:{}'.format(_tag), namespaces = self.ns) if _elem is not None: _asset_dict[_tag] = _elem.text self.assets[_id.text] = _asset_dict return self.assets def DumpPkl(self): if self.pkl_data is None: return _pkl_str="Packing List data\n" _tag = "Id" _pkl_str += "{:<30}: {}\n".format(_tag, self.pkl_data[_tag]) _tag = "IssueDate" _pkl_str += "{:<30}: {}\n".format(_tag, self.pkl_data[_tag]) _tag = "Issuer" _pkl_str += "{:<30}: {}\n".format(_tag, self.pkl_data[_tag]) _tag = "Creator" _pkl_str += "{:<30}: {}\n".format(_tag, self.pkl_data[_tag]) print(_pkl_str) def _Verify(self): _tag = 'Id' self.pkl_data[_tag] = self._FindOne('./pkl:{}'.format(_tag)).text if self.pkl_data[_tag] != self.pkl_urn_id: _msg = "Id of PKL did not match one in AssetMap" logger.error("ID in PKL: {} ID of PKL in AssetMap: {}".format( self.pkl_data[_tag], self.pkl_urn_id)) raise DiError(_msg) _tag = 'IssueDate' self.pkl_data[_tag] = self._FindOne('./pkl:{}'.format(_tag)).text _tag = 'Issuer' self.pkl_data[_tag] = self._FindOne('./pkl:{}'.format(_tag)).text _tag = 'Creator' self.pkl_data[_tag] = self._FindOne('./pkl:{}'.format(_tag)).text _tag = 'AssetList' self._FindOne('./pkl:{}'.format(_tag)) class DiParser(object): def __init__(self, p_dcp_folder): if os.path.exists(p_dcp_folder): self.p_dcp_folder = p_dcp_folder else: _emsg = "Not a DCP folder: {}".format(p_dcp_folder) raise DiError(_emsg) self.volindex = "" def list_unexpected_files(self): if not hasattr(self, '_unexpected_files'): self.check_files(strict=False) return self._unexpected_files def list_dcp_files(self): try: self.getAssetmap() self._assetmap_xml = AssetmapParser(self.assetmap_path) except DiError as msg: logger.error(msg) return 0 self.am_assets = self._assetmap_xml.GetAllAssets() _dcp_files = list(self.am_assets.values()) # insert assetmap file _dcp_files.append(os.path.basename(self.assetmap_path)) # insert VOLINDEX as it is not listed in AssetMap # TODO Manage case of several VOLINDEX ie with or without .xml _dcp_files.append(self.volindex) return sorted(_dcp_files) def check_files(self, strict=True): _nb_assets = 0 # 1st check presence of dummy VOLINDEX if not self.isVolindexPresent(): _msg = "No VOLINDEX found in DCP folder({}) ".format( self.p_dcp_folder) logger.error(_msg) return 0 try: self.getAssetmap() self._assetmap_xml = AssetmapParser(self.assetmap_path) except DiError as msg: logger.error(msg) return 0 self.am_assets = self._assetmap_xml.GetAllAssets() logger.debug("Found {} assets".format(len(self.am_assets))) self.pkls = self._assetmap_xml.GetPkls() logger.debug("Found {} PKLS".format(len(self.pkls))) if len(self.pkls) == 0: _msg = "No PKL found. Bad DCP" logger.error(_msg) return 0 for _pkl in self.pkls : _pkl_urn_id = _pkl _pkl_path= os.path.join(self.p_dcp_folder, self.am_assets[_pkl_urn_id]) logger.debug("Parsing PKL: {}".format(os.path.basename(_pkl_path))) try: _pkl_xml = PklParser(_pkl_path, _pkl_urn_id) except DiError as msg: logger.error(msg) return 0 # Valid pkl file increasse asset counter _nb_assets +=1 _msg = "Found : {} ".format(self.am_assets[_pkl_urn_id]) logger.debug(_msg) _pkl_assets = _pkl_xml.GetAssets() try: _nb_assets += self._ExistsAssets(_pkl_assets) except DiError as _msg: logger.error(_msg) return 0 if (_nb_assets != len (self.am_assets)): _msg = "Invalid number of assets,( {} in AssetMap, {} counted)"\ .format(len(self.am_assets), _nb_assets) # check presence of uneeded files _dcp_files = self.list_dcp_files() _dir_files = list_all_files(self.p_dcp_folder) self._unexpected_files = [] if len(list(set(_dir_files) - set(_dcp_files))) > 0: _msg = "Unexpected files or dir present in DCP folder {}".format(self.p_dcp_folder) if (strict): logger.error(_msg) logger.error("Unexpected files or dir : ") else: logger.warning('Errors transformed in warning (relaxed check):') logger.warning(_msg) logger.warning("Unexpected files or dir : ") _unexpected = list(set(_dir_files) - set(_dcp_files)) self._unexpected_files = _unexpected for _f in _unexpected: if (strict): logger.error("- {}".format(_f)) else: logger.warning("- {}".format(_f)) _msg = "Files in directory = {} Expected files in DCP = {}".format(len(_dir_files), len(_dcp_files)) if (strict): logger.error(_msg) return 0 else: logger.warning(_msg) if len(list(set(_dcp_files) - set(_dir_files))) > 0: # This should likely not happen as file existence got checked before. _msg="DCP {} contains less file then expected : files in dir = {} expected files ={} "\ .format(self.p_dcp_folder, len(_dir_files), len(_dcp_files) ) logger.error(_msg) return 0 else : logger.debug("files in DCP and files in Assetmap are coherent") return _nb_assets def check_hash(self): _nb_assets = 0 # 1st check presence of dummy VOLINDEX if not self.isVolindexPresent(): _msg = "No VOLINDEX found in DCP folder({}) ".format( self.p_dcp_folder) logger.error(_msg) return "KO" try: self.getAssetmap() self._assetmap_xml = AssetmapParser(self.assetmap_path) except DiError as msg: logger.error(msg) return "KO" self.am_assets = self._assetmap_xml.GetAllAssets() logger.debug("Found {} assets".format(len(self.am_assets))) self.pkls = self._assetmap_xml.GetPkls() if len(self.pkls) == 0: _msg = "No PKL found. Bad DCP" logger.error(_msg) return "KO" for _pkl in self.pkls : _pkl_urn_id = _pkl _pkl_path= os.path.join(self.p_dcp_folder, self.am_assets[_pkl_urn_id]) logger.debug("Parsing PKL: {}".format(os.path.basename(_pkl_path))) try: _pkl_xml = PklParser(_pkl_path, _pkl_urn_id) except DiError as msg: logger.error(msg) return "KO" # Valid pkl file increasse asset counter _nb_assets +=1 _msg = "Found : {} ".format(self.am_assets[_pkl_urn_id]) logger.debug(_msg) _pkl_assets = _pkl_xml.GetAssets() try: self._ExistsAssets(_pkl_assets) self._VerifyHash() except DiError as _msg: logger.error(_msg) return "KO" return "OK" def Ingest(self): self.getAssetmap() self._assetmap_xml = AssetmapParser(self.assetmap_path) self.am_assets = self._assetmap_xml.GetAllAssets() self.pkls = self._assetmap_xml.GetPkls() for _pkl in self.pkls: _pkl_path= os.path.join(self.p_dcp_folder, self.am_assets[_pkl]) _pkl_xml = PklParser(_pkl_path) _pkl_assets = _pkl_xml.GetAssets() _pkl_xml.DumpPkl() self._VerifyAssets(_pkl_assets) def getAssetmap(self) : _assetmap = os.path.join(self.p_dcp_folder, "ASSETMAP") if os.path.isfile(_assetmap): logger.debug("The DCP is in interop format") self.assetmap_path = _assetmap else: _assetmap = os.path.join(self.p_dcp_folder, "ASSETMAP.xml") if os.path.isfile(_assetmap): logger.debug("The DCP is in SMPTE format") self.assetmap_path = _assetmap else: _emsg="No ASSETMAP file found" raise DiError(_emsg) return def isAssetmap(self, p_file): if (p_file == "ASSETMAP") or (p_file == "ASSETMAP.xml"): return True return False def isVolindexPresent(self): if os.path.exists(os.path.join(self.p_dcp_folder, "VOLINDEX")): self.volindex = "VOLINDEX" return True if os.path.exists(os.path.join(self.p_dcp_folder, "VOLINDEX.xml")) : self.volindex = "VOLINDEX.xml" return True return False def _ExistsAssets(self, p_pkl_assets): self.assets = {} for _k,_v in p_pkl_assets.items(): if _k in self.am_assets: _asset =_v _path = os.path.join(self.p_dcp_folder, self.am_assets[_k]) if os.path.exists(_path): _asset['Path'] = _path _msg = "Found : {} ".format(_path[len(self.p_dcp_folder):]) logger.debug(_msg) else: _msg = "Asset {} not in DCP directory".format( _path[len(self.p_dcp_folder):]) raise DiError(_msg) if os.stat(_path).st_size != int(_v['Size']) : _msg = "Asset {} has wrong size".format( _path[len(self.p_dcp_folder):]) logger.error("stat size = {}, Size in PKL = {}".format( os.stat(_path).st_size, _v['Size'])) raise DiError(_msg) self.assets[_k] = _asset else: _msg = "Asset with id {} not found in assetmap".format(_k) raise DiError(_msg) return len(self.assets) def _VerifyHash(self): for _k, _v in self.assets.items(): _msg = "Checking hash of {}".format(_v['Path']) logger.debug(_msg) _sum = self._HashSum(_v['Path']) if _sum == _v['Hash']: _msg = " Hash verification OK ({})".format(_v['Path']) logger.debug(_msg) else: _msg = " Hash verification failed for file {} \ CALC SUM = {}\n EXPT SUM = {} ".format( _v['Path'], _sum, _v['Hash'] ) logger.error(_msg) raise DiError(_msg) def _HashSum(self, p_filepath): """ check SHA1 of DCP files. As defined in SMPTE 429-8-2007 section 6.3""" _sha1 = hashlib.sha1() _f = open(p_filepath, 'rb') try: for _buff in iter(partial(_f.read, 10 * 1024**2), b''): _sha1.update(_buff) finally: _f.close() return base64.b64encode(_sha1.digest()).decode('ASCII') def tdcpb_check_files(p_dcp_folder, strict=True): ''' Check the contents of a DCP folder. If strict is False, the check will succeed when errors are fixable (e.g. presence of unexpected files). ''' _dcp_folder = os.path.abspath(p_dcp_folder) logger.info('File check started for {}'\ .format(os.path.basename(_dcp_folder))) # do some basic check if not os.path.exists(_dcp_folder): _msg = "dcp directory {} does not exist"\ .format(_dcp_folder) raise TdcpbException(_msg) #TODO : why not use normpath ? print("Verfication DCP en cours ... : {}".format(os.path.basename(_dcp_folder))) try : DCP = DiParser(_dcp_folder) _nb = DCP.check_files(strict=strict) except DiError as _err: raise TdcpbException(_err) if _nb == 0: _err = "DCP {} not well formed "\ .format(os.path.basename(_dcp_folder)) raise TdcpbException(_err) logger.info('File check OK for {}'\ .format(os.path.basename(_dcp_folder))) print("Verfication DCP courte OK : {}".format(os.path.basename(_dcp_folder))) def tdcpb_check_hash(p_dcp_folder): ''' Check integrity (checksum verification) of a DCP. This usually takes a long time. ''' logger.info("Hash Check started for {}"\ .format(os.path.basename(p_dcp_folder))) # do some basic check if not os.path.exists(p_dcp_folder): _msg = "dcp directory {} does not exist"\ .format(p_dcp_folder) raise TdcpbException(_msg) _dcp_folder = os.path.abspath(p_dcp_folder) print("Verfication DCP en cours ... : {}".format(os.path.basename(_dcp_folder))) try : DCP = DiParser(_dcp_folder) _res = DCP.check_hash() except DiError as _err: raise TdcpbException(_err) if _res != 'OK': _err = "DCP hash verfication failed" print("Verfication DCP ERREUR : {}". \ format(os.path.basename(_dcp_folder))) raise TdcpbException(_err) print("Verfication DCP longue OK : {}".format(os.path.basename(_dcp_folder))) logger.info("Hash OK for {}". \ format(os.path.basename(_dcp_folder))) def tdcpb_check(p_dcp_folder, p_check_type=u"short"): if (p_check_type == u"short"): tdcpb_check_files(p_dcp_folder) elif (p_check_type == u"long"): tdcpb_check_hash(p_dcp_folder) else: _err = "unknow verfication type:{}".format(p_check_type) logger.error(_err) raise TdcpbException(_err) def main(argv): parser = argparse.ArgumentParser(description='DCP intgrity(hash) verification') parser.add_argument('dcp_path', metavar='DCP_PATH', type = str, nargs = "?", help = 'DCP path' ) parser.add_argument('-t', '--type', help='type of DCP check: short or long. Default: short', default='short') args = parser.parse_args() if not args.dcp_path: logger.error("No DCP source") return 1 try: tdcpb_check(args.dcp_path, args.type) except TdcpbException as _err: logger.error(_err) return 1 return 0 if __name__ == "__main__": sys.exit(main(sys.argv))