安装 MinIO
pip install minio -i https://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com
下载文件
client.fget_object()
直接下载文件并存储在本地磁盘文件
client.get_object()
下载数据到内存,需要手动存储到文件
from minio import Minio
from minio.error import S3Error
import os
from loguru import logger
def download_file_from_minio(client, bucket_name, object_name, localfile):
try:
client.fget_object(
bucket_name,
object_name,
localfile
)
print(f"视频 '{bucket_name}/{object_name}' 已下载到 '{localfile}'")
except S3Error as e:
print(f"下载失败: {e}")
except Exception as e:
print(f"发生错误: {e}")
logger.exception(e)
def download_data_from_minio(client, bucket_name, object_name, localfile):
from io import BytesIO
try:
bdata = BytesIO()
resp = client.get_object(
bucket_name,
object_name
)
for block in resp.stream(32 * 1024): # 32 KB trunks
bdata.write(block)
bdata.seek(0)
with open(localfile, 'wb') as f:
f.write(bdata.read())
print(f"视频 '{bucket_name}/{object_name}' 已下载到 '{localfile}'")
except S3Error as e:
print(f"下载失败: {e}")
except Exception as e:
print(f"发生错误: {e}")
logger.exception(e)
def get_minio_client(conf):
print(conf)
# 初始化 MinIO 客户端
client = Minio(
conf['server'], # MinIO 服务器地址
access_key=conf['user'],
secret_key=conf['passwd'],
secure=conf['secure'] # 是否使用 HTTPS
)
return client
if __name__ == "__main__":
conf = {
"server": os.getenv("MINIO_SERVER", "172.17.0.1:9000"),
"user": os.getenv("MINIO_USER", ""),
"passwd": os.getenv("MINIO_PASSWORD", ""),
"secure": True if os.getenv("MINIO_SECURE", None) else False
}
client = get_minio_client(conf)
bucket_name = 'test-x001'
object_name = "images/a.jpg"
localfile = "x.jpg"
download_file_from_minio(client, bucket_name, object_name, localfile)