utopia-tools/check-dcp

588 lines
21 KiB
Python
Executable File

#!/bin/python3
import sys
import os.path
import argparse
import base64
import logging as logger
from lxml import etree
from urllib.parse import urlparse
import hashlib
from functools import partial
class TdcpbException(Exception):
def __init__(self, message, errors = None):
Exception.__init__(self, message)
def list_all_files(p_path):
fileList = []
fileSize = 0
folderCount = 0
for root, subFolders, files in os.walk(p_path):
folderCount += len(subFolders)
# add empty folder to fileList
for _folder in subFolders:
_f = os.path.join(root,_folder)
if not os.listdir(_f):
fileList.append(_f)
# add files
for file in files:
f = os.path.join(root,file)
fileSize = fileSize + os.path.getsize(f)
fileList.append(f)
logger.debug("Total Size is {0} bytes".format(fileSize))
logger.debug("Total Files: {} ".format(len(fileList)))
logger.debug("Total Folders: {}".format(folderCount))
# return relative path
fileList = [ _w.split(p_path)[1] for _w in fileList]
# remove leading "/"
fileList = [ _w[1:] for _w in fileList]
return sorted(fileList)
def URItoPath(p_path):
_parsed = urlparse(p_path.strip())
_abs_path = ''.join([_parsed.netloc, _parsed.path])
if _abs_path.startswith("/"):
#remove leading "/"
_abs_path = _abs_path[1:]
return _abs_path
class DiError(TdcpbException):
def __init__(self,value):
self.value= value
def __str__(self):
return repr(self.value)
class AssetmapParser(object):
def __init__(self, p_xml_path):
self.p_xml_path = p_xml_path
try:
self.tree = etree.parse(self.p_xml_path)
except IOError as msg:
_msg="Parser Error in file {}: {}".format(self.p_xml_path, msg)
raise DiError(_msg)
except etree.XMLSyntaxError as _msg:
_err = "File {}: {}".format(self.p_xml_path, _msg)
raise DiError(_err)
self.root = self.tree.getroot()
self._GetNamespaces()
self.assets = {}
self._Verify()
def Dump(self):
pass
def GetAllAssets(self) :
_assets = self.tree.findall('./am:AssetList/am:Asset', namespaces = self.ns)
for _asset in _assets:
_id = _asset.find('am:Id', namespaces = self.ns)
_path = _asset.find("./am:ChunkList/am:Chunk/am:Path", namespaces = self.ns)
self.assets[_id.text] = URItoPath(_path.text)
return self.assets
def GetPkls(self) :
self.pkls = []
_assets = self.tree.findall('./am:AssetList/am:Asset', namespaces = self.ns)
for _asset in _assets:
_pkl = _asset.find('am:PackingList', namespaces = self.ns)
if (_pkl is not None):
if (_pkl.text != "false"):
self.pkls.append(_asset.find('am:Id', namespaces = self.ns).text)
return self.pkls
def _GetNamespaces(self):
""" Get Namespaces """
_ns = self.tree.getroot().tag[1:].split("}")[0]
self.ns = {'am': _ns}
logger.debug("Namespace is {}".format(self.ns))
def _FindOne(self, p_tag, p_elem = None) :
if p_elem is not None :
_elem = p_elem.findall(p_tag, namespaces = self.ns)
else :
_elem = self.tree.findall(p_tag, namespaces = self.ns)
if len( _elem) != 1:
_emsg = "Find tag {} {} times. Exepcted only one.".format(p_tag,
len(_elem))
raise DiError(_emsg)
return _elem[0]
def _Verify(self):
for _tag in self.root.findall('am:VolumeCount', namespaces = self.ns):
_count = int(_tag.text)
if _count > 1:
_emsg = "The tool does not support more the one VOLINDEX"
raise DiError(_emsg)
_tag = './am:AssetList'
_asset_list = self._FindOne(_tag)
_tag = './am:Asset'
_assets =_asset_list.findall(_tag, namespaces = self.ns)
if not _assets:
_emsg = "No Asset found. Invalid AssetMap"
raise DiError(_emsg)
_pkl_count = 0
for _asset in _assets:
# check Id presence
_tag = './am:Id'
_id = self._FindOne(_tag, _asset)
_tag = './am:PackingList'
_pkl =_asset.find(_tag, namespaces = self.ns)
if (_pkl is not None):
if (_pkl.text != "false"):
_pkl_count += 1
_tag = './am:ChunkList'
_chunk_list = self._FindOne(_tag, _asset)
_tag = './am:Chunk'
_chunk = _chunk_list.findall(_tag, namespaces = self.ns)
if not _chunk :
_emsg =" No Chunk found"
raise DiError(_emsg)
if len(_chunk) > 1 :
_emsg = "Tool doesn't handle segmentation (i.e. several vol index)"
raise DiError(_emsg)
_tag = './am:Path'
_path = self._FindOne(_tag, _chunk[0])
if _pkl_count == 0 :
_emsg = "No PKL found."
raise DiError(_emsg)
class PklParser(AssetmapParser):
def __init__(self,p_xml_path, pkl_urn_id):
self.pkl_data={}
self.pkl_urn_id = pkl_urn_id
AssetmapParser.__init__(self,p_xml_path)
def _GetNamespaces(self):
""" Get Namespaces """
_ns = self.tree.getroot().tag[1:].split("}")[0]
self.ns = {'pkl': _ns}
def GetAssets(self):
_assets = self.tree.findall('./pkl:AssetList/pkl:Asset', namespaces = self.ns)
for _asset in _assets:
_asset_dict={}
# parse mandatory tags
_tag = 'Id'
_id = _asset.find('pkl:{}'.format(_tag), namespaces = self.ns)
_tag = 'Hash'
_elem = _asset.find('pkl:{}'.format(_tag), namespaces = self.ns)
_asset_dict[_tag] = _elem.text
_tag = 'Size'
_elem = _asset.find('pkl:{}'.format(_tag), namespaces = self.ns)
_asset_dict[_tag] = _elem.text
_tag = 'Type'
_elem = _asset.find('pkl:{}'.format(_tag), namespaces = self.ns)
_asset_dict[_tag] = _elem.text
# parse optional tags
_tag = 'OriginalFileName'
_elem = _asset.find('pkl:{}'.format(_tag), namespaces = self.ns)
if _elem is not None:
_asset_dict[_tag] = _elem.text
_tag = 'AnnotationText'
_elem = _asset.find('pkl:{}'.format(_tag), namespaces = self.ns)
if _elem is not None:
_asset_dict[_tag] = _elem.text
self.assets[_id.text] = _asset_dict
return self.assets
def DumpPkl(self):
if self.pkl_data is None:
return
_pkl_str="Packing List data\n"
_tag = "Id"
_pkl_str += "{:<30}: {}\n".format(_tag, self.pkl_data[_tag])
_tag = "IssueDate"
_pkl_str += "{:<30}: {}\n".format(_tag, self.pkl_data[_tag])
_tag = "Issuer"
_pkl_str += "{:<30}: {}\n".format(_tag, self.pkl_data[_tag])
_tag = "Creator"
_pkl_str += "{:<30}: {}\n".format(_tag, self.pkl_data[_tag])
print(_pkl_str)
def _Verify(self):
_tag = 'Id'
self.pkl_data[_tag] = self._FindOne('./pkl:{}'.format(_tag)).text
if self.pkl_data[_tag] != self.pkl_urn_id:
_msg = "Id of PKL did not match one in AssetMap"
logger.error("ID in PKL: {} ID of PKL in AssetMap: {}".format(
self.pkl_data[_tag], self.pkl_urn_id))
raise DiError(_msg)
_tag = 'IssueDate'
self.pkl_data[_tag] = self._FindOne('./pkl:{}'.format(_tag)).text
_tag = 'Issuer'
self.pkl_data[_tag] = self._FindOne('./pkl:{}'.format(_tag)).text
_tag = 'Creator'
self.pkl_data[_tag] = self._FindOne('./pkl:{}'.format(_tag)).text
_tag = 'AssetList'
self._FindOne('./pkl:{}'.format(_tag))
class DiParser(object):
def __init__(self, p_dcp_folder):
if os.path.exists(p_dcp_folder):
self.p_dcp_folder = p_dcp_folder
else:
_emsg = "Not a DCP folder: {}".format(p_dcp_folder)
raise DiError(_emsg)
self.volindex = ""
def list_unexpected_files(self):
if not hasattr(self, '_unexpected_files'):
self.check_files(strict=False)
return self._unexpected_files
def list_dcp_files(self):
try:
self.getAssetmap()
self._assetmap_xml = AssetmapParser(self.assetmap_path)
except DiError as msg:
logger.error(msg)
return 0
self.am_assets = self._assetmap_xml.GetAllAssets()
_dcp_files = list(self.am_assets.values())
# insert assetmap file
_dcp_files.append(os.path.basename(self.assetmap_path))
# insert VOLINDEX as it is not listed in AssetMap
# TODO Manage case of several VOLINDEX ie with or without .xml
_dcp_files.append(self.volindex)
return sorted(_dcp_files)
def check_files(self, strict=True):
_nb_assets = 0
# 1st check presence of dummy VOLINDEX
if not self.isVolindexPresent():
_msg = "No VOLINDEX found in DCP folder({}) ".format(
self.p_dcp_folder)
logger.error(_msg)
return 0
try:
self.getAssetmap()
self._assetmap_xml = AssetmapParser(self.assetmap_path)
except DiError as msg:
logger.error(msg)
return 0
self.am_assets = self._assetmap_xml.GetAllAssets()
logger.debug("Found {} assets".format(len(self.am_assets)))
self.pkls = self._assetmap_xml.GetPkls()
logger.debug("Found {} PKLS".format(len(self.pkls)))
if len(self.pkls) == 0:
_msg = "No PKL found. Bad DCP"
logger.error(_msg)
return 0
for _pkl in self.pkls :
_pkl_urn_id = _pkl
_pkl_path= os.path.join(self.p_dcp_folder, self.am_assets[_pkl_urn_id])
logger.debug("Parsing PKL: {}".format(os.path.basename(_pkl_path)))
try:
_pkl_xml = PklParser(_pkl_path, _pkl_urn_id)
except DiError as msg:
logger.error(msg)
return 0
# Valid pkl file increasse asset counter
_nb_assets +=1
_msg = "Found : {} ".format(self.am_assets[_pkl_urn_id])
logger.debug(_msg)
_pkl_assets = _pkl_xml.GetAssets()
try:
_nb_assets += self._ExistsAssets(_pkl_assets)
except DiError as _msg:
logger.error(_msg)
return 0
if (_nb_assets != len (self.am_assets)):
_msg = "Invalid number of assets,( {} in AssetMap, {} counted)"\
.format(len(self.am_assets), _nb_assets)
# check presence of uneeded files
_dcp_files = self.list_dcp_files()
_dir_files = list_all_files(self.p_dcp_folder)
self._unexpected_files = []
if len(list(set(_dir_files) - set(_dcp_files))) > 0:
_msg = "Unexpected files or dir present in DCP folder {}".format(self.p_dcp_folder)
if (strict):
logger.error(_msg)
logger.error("Unexpected files or dir : ")
else:
logger.warning('Errors transformed in warning (relaxed check):')
logger.warning(_msg)
logger.warning("Unexpected files or dir : ")
_unexpected = list(set(_dir_files) - set(_dcp_files))
self._unexpected_files = _unexpected
for _f in _unexpected:
if (strict):
logger.error("- {}".format(_f))
else:
logger.warning("- {}".format(_f))
_msg = "Files in directory = {} Expected files in DCP = {}".format(len(_dir_files), len(_dcp_files))
if (strict):
logger.error(_msg)
return 0
else:
logger.warning(_msg)
if len(list(set(_dcp_files) - set(_dir_files))) > 0:
# This should likely not happen as file existence got checked before.
_msg="DCP {} contains less file then expected : files in dir = {} expected files ={} "\
.format(self.p_dcp_folder, len(_dir_files), len(_dcp_files) )
logger.error(_msg)
return 0
else :
logger.debug("files in DCP and files in Assetmap are coherent")
return _nb_assets
def check_hash(self):
_nb_assets = 0
# 1st check presence of dummy VOLINDEX
if not self.isVolindexPresent():
_msg = "No VOLINDEX found in DCP folder({}) ".format(
self.p_dcp_folder)
logger.error(_msg)
return "KO"
try:
self.getAssetmap()
self._assetmap_xml = AssetmapParser(self.assetmap_path)
except DiError as msg:
logger.error(msg)
return "KO"
self.am_assets = self._assetmap_xml.GetAllAssets()
logger.debug("Found {} assets".format(len(self.am_assets)))
self.pkls = self._assetmap_xml.GetPkls()
if len(self.pkls) == 0:
_msg = "No PKL found. Bad DCP"
logger.error(_msg)
return "KO"
for _pkl in self.pkls :
_pkl_urn_id = _pkl
_pkl_path= os.path.join(self.p_dcp_folder, self.am_assets[_pkl_urn_id])
logger.debug("Parsing PKL: {}".format(os.path.basename(_pkl_path)))
try:
_pkl_xml = PklParser(_pkl_path, _pkl_urn_id)
except DiError as msg:
logger.error(msg)
return "KO"
# Valid pkl file increasse asset counter
_nb_assets +=1
_msg = "Found : {} ".format(self.am_assets[_pkl_urn_id])
logger.debug(_msg)
_pkl_assets = _pkl_xml.GetAssets()
try:
self._ExistsAssets(_pkl_assets)
self._VerifyHash()
except DiError as _msg:
logger.error(_msg)
return "KO"
return "OK"
def Ingest(self):
self.getAssetmap()
self._assetmap_xml = AssetmapParser(self.assetmap_path)
self.am_assets = self._assetmap_xml.GetAllAssets()
self.pkls = self._assetmap_xml.GetPkls()
for _pkl in self.pkls:
_pkl_path= os.path.join(self.p_dcp_folder, self.am_assets[_pkl])
_pkl_xml = PklParser(_pkl_path)
_pkl_assets = _pkl_xml.GetAssets()
_pkl_xml.DumpPkl()
self._VerifyAssets(_pkl_assets)
def getAssetmap(self) :
_assetmap = os.path.join(self.p_dcp_folder, "ASSETMAP")
if os.path.isfile(_assetmap):
logger.debug("The DCP is in interop format")
self.assetmap_path = _assetmap
else:
_assetmap = os.path.join(self.p_dcp_folder, "ASSETMAP.xml")
if os.path.isfile(_assetmap):
logger.debug("The DCP is in SMPTE format")
self.assetmap_path = _assetmap
else:
_emsg="No ASSETMAP file found"
raise DiError(_emsg)
return
def isAssetmap(self, p_file):
if (p_file == "ASSETMAP") or (p_file == "ASSETMAP.xml"):
return True
return False
def isVolindexPresent(self):
if os.path.exists(os.path.join(self.p_dcp_folder, "VOLINDEX")):
self.volindex = "VOLINDEX"
return True
if os.path.exists(os.path.join(self.p_dcp_folder, "VOLINDEX.xml")) :
self.volindex = "VOLINDEX.xml"
return True
return False
def _ExistsAssets(self, p_pkl_assets):
self.assets = {}
for _k,_v in p_pkl_assets.items():
if _k in self.am_assets:
_asset =_v
_path = os.path.join(self.p_dcp_folder, self.am_assets[_k])
if os.path.exists(_path):
_asset['Path'] = _path
_msg = "Found : {} ".format(_path[len(self.p_dcp_folder):])
logger.debug(_msg)
else:
_msg = "Asset {} not in DCP directory".format(
_path[len(self.p_dcp_folder):])
raise DiError(_msg)
if os.stat(_path).st_size != int(_v['Size']) :
_msg = "Asset {} has wrong size".format(
_path[len(self.p_dcp_folder):])
logger.error("stat size = {}, Size in PKL = {}".format(
os.stat(_path).st_size, _v['Size']))
raise DiError(_msg)
self.assets[_k] = _asset
else:
_msg = "Asset with id {} not found in assetmap".format(_k)
raise DiError(_msg)
return len(self.assets)
def _VerifyHash(self):
for _k, _v in self.assets.items():
_msg = "Checking hash of {}".format(_v['Path'])
logger.debug(_msg)
_sum = self._HashSum(_v['Path'])
if _sum == _v['Hash']:
_msg = " Hash verification OK ({})".format(_v['Path'])
logger.debug(_msg)
else:
_msg = " Hash verification failed for file {} \
CALC SUM = {}\n EXPT SUM = {} ".format( _v['Path'], _sum, _v['Hash'] )
logger.error(_msg)
raise DiError(_msg)
def _HashSum(self, p_filepath):
""" check SHA1 of DCP files.
As defined in SMPTE 429-8-2007 section 6.3"""
_sha1 = hashlib.sha1()
_f = open(p_filepath, 'rb')
try:
for _buff in iter(partial(_f.read, 10 * 1024**2), b''):
_sha1.update(_buff)
finally:
_f.close()
return base64.b64encode(_sha1.digest()).decode('ASCII')
def tdcpb_check_files(p_dcp_folder, strict=True):
'''
Check the contents of a DCP folder.
If strict is False, the check will succeed when errors are fixable (e.g.
presence of unexpected files).
'''
_dcp_folder = os.path.abspath(p_dcp_folder)
logger.info('File check started for {}'\
.format(os.path.basename(_dcp_folder)))
# do some basic check
if not os.path.exists(_dcp_folder):
_msg = "dcp directory {} does not exist"\
.format(_dcp_folder)
raise TdcpbException(_msg)
#TODO : why not use normpath ?
print("Verfication DCP en cours ... : {}".format(os.path.basename(_dcp_folder)))
try :
DCP = DiParser(_dcp_folder)
_nb = DCP.check_files(strict=strict)
except DiError as _err:
raise TdcpbException(_err)
if _nb == 0:
_err = "DCP {} not well formed "\
.format(os.path.basename(_dcp_folder))
raise TdcpbException(_err)
logger.info('File check OK for {}'\
.format(os.path.basename(_dcp_folder)))
print("Verfication DCP courte OK : {}".format(os.path.basename(_dcp_folder)))
def tdcpb_check_hash(p_dcp_folder):
'''
Check integrity (checksum verification) of a DCP.
This usually takes a long time.
'''
logger.info("Hash Check started for {}"\
.format(os.path.basename(p_dcp_folder)))
# do some basic check
if not os.path.exists(p_dcp_folder):
_msg = "dcp directory {} does not exist"\
.format(p_dcp_folder)
raise TdcpbException(_msg)
_dcp_folder = os.path.abspath(p_dcp_folder)
print("Verfication DCP en cours ... : {}".format(os.path.basename(_dcp_folder)))
try :
DCP = DiParser(_dcp_folder)
_res = DCP.check_hash()
except DiError as _err:
raise TdcpbException(_err)
if _res != 'OK':
_err = "DCP hash verfication failed"
print("Verfication DCP ERREUR : {}". \
format(os.path.basename(_dcp_folder)))
raise TdcpbException(_err)
print("Verfication DCP longue OK : {}".format(os.path.basename(_dcp_folder)))
logger.info("Hash OK for {}". \
format(os.path.basename(_dcp_folder)))
def tdcpb_check(p_dcp_folder, p_check_type=u"short"):
if (p_check_type == u"short"):
tdcpb_check_files(p_dcp_folder)
elif (p_check_type == u"long"):
tdcpb_check_hash(p_dcp_folder)
else:
_err = "unknow verfication type:{}".format(p_check_type)
logger.error(_err)
raise TdcpbException(_err)
def main(argv):
parser = argparse.ArgumentParser(description='DCP intgrity(hash) verification')
parser.add_argument('dcp_path',
metavar='DCP_PATH',
type = str,
nargs = "?",
help = 'DCP path' )
parser.add_argument('-t', '--type',
help='type of DCP check: short or long. Default: short', default='short')
args = parser.parse_args()
if not args.dcp_path:
logger.error("No DCP source")
return 1
try:
tdcpb_check(args.dcp_path, args.type)
except TdcpbException as _err:
logger.error(_err)
return 1
return 0
if __name__ == "__main__":
sys.exit(main(sys.argv))