python minio 下载文件断点续传

创建日期: 2025-05-23 15:19 | 作者: 风波 | 浏览次数: 32 | 分类: Python

安装 MinIO

pip install minio -i https://mirrors.aliyun.com/pypi/simple/  --trusted-host mirrors.aliyun.com

下载文件断点续传

from minio import Minio
from minio.error import S3Error
import os
from loguru import logger


def download_from_minio_resume(client, bucket_name, object_name, localfile):
    done_size = 0
    if os.path.isfile(localfile):
        done_size = os.stat(localfile).st_size

    from io import BytesIO
    try:
        resp = client.get_object(
            bucket_name,
            object_name,
            offset=done_size
        )
        with open(localfile, 'ab') as f:
            f.seek(os.SEEK_END)
            for block in resp.stream(32 * 1024): # 32 KB trunks
                f.write(block)
        print(f"'{bucket_name}/{object_name}' 已下载到 '{localfile}'")
    except S3Error as e:
        print(f"下载失败: {e}")
    except Exception as e:
        print(f"发生错误: {e}")
        logger.exception(e)
    return True


def get_minio_client(conf):
    print(conf)
    # 初始化 MinIO 客户端
    client = Minio(
        conf['server'],  # MinIO 服务器地址
        access_key=conf['user'],
        secret_key=conf['passwd'],
        secure=conf['secure']  # 是否使用 HTTPS
    )
    return client


if __name__ == "__main__":
    conf = {
            "server": os.getenv("MINIO_SERVER", "172.17.0.1:9000"),
            "user": os.getenv("MINIO_USER", ""),
            "passwd": os.getenv("MINIO_PASSWORD", ""),
            "secure": True if os.getenv("MINIO_SECURE", None) else False
            }
    client = get_minio_client(conf)
    bucket_name = 'test-x001'
    object_name = "images/a.jpg"
    localfile = "x.jpg"
    download_from_minio_resume(client, bucket_name, object_name, localfile)
32 浏览
10 爬虫
0 评论