安装 minio
pip install minio -i https://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com
上传文件
client.fput_object()
通过文件路径上传文件
client.put_object()
直接上传数据
from minio import Minio
from minio.error import S3Error
import os
from loguru import logger
def get_file_mime(localfile):
import mimetypes
# 自动检测文件 MIME 类型
content_type, _ = mimetypes.guess_type(localfile)
if content_type is None:
content_type = "application/octet-stream"
print(f"file content type: {content_type}")
return content_type
def upload_image_to_minio(client, bucket_name, object_name, localfile, mime=None):
try:
if not client.bucket_exists(bucket_name):
print(f"Bucket '{bucket_name}' 不存在")
return False
if mime is None:
mime = get_file_mime(localfile)
# 上传文件
client.fput_object(
bucket_name,
object_name,
localfile,
content_type=mime # 设置正确的 MIME 类型
)
print(f"图片 '{localfile}' 已成功上传到 '{bucket_name}/{object_name}'")
except S3Error as e:
print(f"上传失败: {e}")
except FileNotFoundError:
print(f"本地文件 '{localfile}' 不存在")
except Exception as e:
print(f"发生错误: {e}")
logger.exception(e)
def upload_data_to_minio(client, bucket_name, object_name, bdata, mime=None):
try:
from io import BytesIO
if not client.bucket_exists(bucket_name):
print(f"Bucket '{bucket_name}' 不存在")
return False
memdata = BytesIO(bdata)
# 上传文件
client.put_object(
bucket_name,
object_name,
memdata,
len(bdata),
content_type="image/jpeg"
)
print(f"图片 '{localfile}' 已成功上传到 '{bucket_name}/{object_name}'")
except S3Error as e:
print(f"上传失败: {e}")
except FileNotFoundError:
print(f"本地文件 '{localfile}' 不存在")
except Exception as e:
print(f"发生错误: {e}")
logger.exception(e)
def get_minio_client(conf):
print(conf)
# 初始化 MinIO 客户端
client = Minio(
conf['server'], # MinIO 服务器地址
access_key=conf['user'],
secret_key=conf['passwd'],
secure=conf['secure'] # 是否使用 HTTPS
)
return client
if __name__ == "__main__":
conf = {
"server": os.getenv("MINIO_SERVER", "172.17.0.1:9000"),
"user": os.getenv("MINIO_USER", ""),
"passwd": os.getenv("MINIO_PASSWORD", ""),
"secure": True if os.getenv("MINIO_SECURE", None) else False
}
client = get_minio_client(conf)
bucket_name = 'test-x001'
localfile = './a.jpg'
object_name = 'images/a.jpg'
upload_image_to_minio(client, bucket_name, object_name, localfile, mime='image/jpeg')
upload_image_to_minio(client, bucket_name, object_name, localfile)
upload_data_to_minio(client, bucket_name, object_name, open(localfile, 'rb').read(), mime='image/jpeg')