2024年5月10日 15:25 by wst
数据处理有很多tar文件,要检测它们是否完整。
采用的方法是:如果能正确读取其中一条数据,则此文件正常。
但是tar文件有好几万个,怎么办?
这里用的线程池的方法,代码如下:
# -*- encoding: utf-8 -*-
'''
@File :check_dataxyz.py
@Author :shitao
@Time :2024/05/09 13:56:11
@Description :检查下载文件的完整性
'''
from pathlib import Path
from webdataset import WebDataset
from concurrent.futures import ThreadPoolExecutor
import logging
formatter = logging.Formatter(
fmt="%(asctime)s %(name)s %(filename)s %(message)s",
datefmt="%Y/%m/%d %X"
)
fh = logging.FileHandler("check.log",encoding="utf-8")
ch = logging.StreamHandler()
fh.setFormatter(formatter)
ch.setFormatter(formatter)
logger = logging.Logger('check')
logger.addHandler(fh)
logger.addHandler(ch)
def check_complete_one(f):
"检查一个tar文件的完整性, 能读出则为正常。"
try:
for sample in WebDataset(str(f)).decode('pil'):
print("sample:", sample['__key__'])
break
except Exception as e:
logger.info(f"bad tar:{f}")
return False
return True
def check_complete(dir_path: Path):
"检查完整性"
pool = ThreadPoolExecutor(64)
results = []
for f in dir_path.iterdir():
if f.suffix == '.tar':
r = pool.submit(check_complete_one, f)
results.append(r)
for result in results:
r = result.result()
if not r:
return False
return True
def check_good(parent_path):
"入口函数"
logger.info("start...")
path = Path(parent_path)
num = 10
folders = [path.joinpath("data_dir"+str(i+1), "shards") for i in range(num)]
for fold in folders:
if check_complete(fold):
logger.info(f"[{fold}]is good.")
else:
logger.info(f"[{fold}]is bad.")
logger.info("Done.")
if __name__ == "__main__":
# // 检查文件的连续性和完整性
parent_path = "/volumes/dataxyz_finished"
check_good(parent_path)