安装 MinIO
pip install minio -i https://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com
下载文件断点续传
from minio import Minio
from minio.error import S3Error
import os
from loguru import logger
def download_from_minio_resume(client, bucket_name, object_name, localfile):
done_size = 0
if os.path.isfile(localfile):
done_size = os.stat(localfile).st_size
from io import BytesIO
try:
resp = client.get_object(
bucket_name,
object_name,
offset=done_size
)
with open(localfile, 'ab') as f:
f.seek(os.SEEK_END)
for block in resp.stream(32 * 1024): # 32 KB trunks
f.write(block)
print(f"'{bucket_name}/{object_name}' 已下载到 '{localfile}'")
except S3Error as e:
print(f"下载失败: {e}")
except Exception as e:
print(f"发生错误: {e}")
logger.exception(e)
return True
def get_minio_client(conf):
print(conf)
# 初始化 MinIO 客户端
client = Minio(
conf['server'], # MinIO 服务器地址
access_key=conf['user'],
secret_key=conf['passwd'],
secure=conf['secure'] # 是否使用 HTTPS
)
return client
if __name__ == "__main__":
conf = {
"server": os.getenv("MINIO_SERVER", "172.17.0.1:9000"),
"user": os.getenv("MINIO_USER", ""),
"passwd": os.getenv("MINIO_PASSWORD", ""),
"secure": True if os.getenv("MINIO_SECURE", None) else False
}
client = get_minio_client(conf)
bucket_name = 'test-x001'
object_name = "images/a.jpg"
localfile = "x.jpg"
download_from_minio_resume(client, bucket_name, object_name, localfile)