python中多线程移动文件

2024年5月10日 13:41 by wst

python高级

问题

从一个路径移动文件到另外一个路径,由于文件多,跑了一个多小时才结束。

太慢了,后面还有9个文件夹要移动。

解决方案

采用多线程的方式解决,代码如下:

# -*- encoding: utf-8 -*-
'''
@File        :resume_datacomp.py
@Author      :shitao
@Time        :2024/05/09 15:31:18
@Description :从数据集恢复数据
目标: 从6(finished)恢复5(default).
逻辑: 如果在数据集中的文件6在本地6没有, 则本地的这个6是从别的地方5移动过来的。
'''
import json
from pathlib import Path
import shutil
import threading

from utils.gen_log import Log

logger = Log()


def resume_one(from_list, mid_p, aim_p):
    "执行一个分片"    
    mid_path = Path(mid_p)
    aim_path = Path(aim_p)
    for f in from_list:
        num = f.name.split("_")[0]
        loc_json = f
        loc_file = f.parent.joinpath(f"{num}.tar")
        loc_parq = f.parent.joinpath(f"{num}.parquet")
        json_data = json.load(open(f))
        logger.info("handle:{}".format(f))
        # 生成被检查文件的文件名
        mid_json = mid_path.joinpath(f.name)
        mid_file = mid_path.joinpath(f"{num}.tar")
        mid_parq = mid_path.joinpath(f"{num}.parquet")

        flag = True
        if not mid_json.exists():
            logger.info("不存在, 需要移动.")
            flag = False
        else:
            mid_data = json.load(open(mid_json))

            for key in ["successes", "failed_to_download", "failed_to_resize", "duration", "start_time", "end_time"]:
                if json_data[key] != mid_data[key]:
                    logger.info("不相等, 需要移动.")
                    flag = False
                    break

        if not flag:
            logger.info("copy file...")
            # 拷贝文件
            shutil.move(loc_json, aim_path) if not aim_path.joinpath(loc_json.name).exists() else ''
            shutil.move(loc_file, aim_path) if not aim_path.joinpath(loc_file.name).exists() else ''
            shutil.move(loc_parq, aim_path) if not aim_path.joinpath(loc_parq.name).exists() else ''
        else:
            logger.info("not copy.")

def resume_multi(from_p, mid_p, aim_p):
    """
    from_p: 本地finished文件夹
    mid_p: 远程finished文件夹
    aim_p: 目的文件夹
    """
    from_path = Path(from_p)
    mid_path = Path(mid_p)
    aim_path = Path(aim_p)
    json_list = [f for f in from_path.iterdir() if f.suffix == ".json"]
    # 分片数
    num_shards = 8
    shards = [json_list[i::num_shards] for i in range(num_shards)]
    threads = []
    for shard in shards:
        thread = threading.Thread(target=resume_one, args=(shard, mid_path, aim_path))
        thread.start()
        threads.append(thread)

    for thread in threads:
        thread.join()

    logger.info("Done!")

if __name__ == "__main__":
    from_p = "/volumes/dataxyz_finished/data_dir6/shards/"
    mid_p = "/dataset/0-1-0/dataxyz_finished/data_dir6/shards"
    aim_p = "/volumes/dataxyz/d5/shards"
    resume_multi(from_p, mid_p, aim_p)

其中关于数据的分片,参考了另外一篇文章


Comments(0) Add Your Comment

Not Comment!