python minio 上传文件

创建日期: 2025-05-23 11:15 | 作者: 风波 | 浏览次数: 25 | 分类: Python

安装 minio

pip install minio -i https://mirrors.aliyun.com/pypi/simple/  --trusted-host mirrors.aliyun.com

上传文件

from minio import Minio
from minio.error import S3Error
import os
from loguru import logger


def get_file_mime(localfile):
    import mimetypes

    # 自动检测文件 MIME 类型
    content_type, _ = mimetypes.guess_type(localfile)
    if content_type is None:
        content_type = "application/octet-stream"
    print(f"file content type: {content_type}")
    return content_type


def upload_image_to_minio(client, bucket_name, object_name, localfile, mime=None):
    try:
        if not client.bucket_exists(bucket_name):
            print(f"Bucket '{bucket_name}' 不存在")
            return False
        if mime is None:
            mime = get_file_mime(localfile)

        # 上传文件
        client.fput_object(
            bucket_name,
            object_name,
            localfile,
            content_type=mime  # 设置正确的 MIME 类型
        )

        print(f"图片 '{localfile}' 已成功上传到 '{bucket_name}/{object_name}'")

    except S3Error as e:
        print(f"上传失败: {e}")
    except FileNotFoundError:
        print(f"本地文件 '{localfile}' 不存在")
    except Exception as e:
        print(f"发生错误: {e}")
        logger.exception(e)


def upload_data_to_minio(client, bucket_name, object_name, bdata, mime=None):
    try:
        from io import BytesIO

        if not client.bucket_exists(bucket_name):
            print(f"Bucket '{bucket_name}' 不存在")
            return False
        memdata = BytesIO(bdata)
        # 上传文件
        client.put_object(
            bucket_name,
            object_name,
            memdata,
            len(bdata),
            content_type="image/jpeg"
        )

        print(f"图片 '{localfile}' 已成功上传到 '{bucket_name}/{object_name}'")

    except S3Error as e:
        print(f"上传失败: {e}")
    except FileNotFoundError:
        print(f"本地文件 '{localfile}' 不存在")
    except Exception as e:
        print(f"发生错误: {e}")
        logger.exception(e)


def get_minio_client(conf):
    print(conf)
    # 初始化 MinIO 客户端
    client = Minio(
        conf['server'],  # MinIO 服务器地址
        access_key=conf['user'],
        secret_key=conf['passwd'],
        secure=conf['secure']  # 是否使用 HTTPS
    )
    return client


if __name__ == "__main__":
    conf = {
            "server": os.getenv("MINIO_SERVER", "172.17.0.1:9000"),
            "user": os.getenv("MINIO_USER", ""),
            "passwd": os.getenv("MINIO_PASSWORD", ""),
            "secure": True if os.getenv("MINIO_SECURE", None) else False
            }
    client = get_minio_client(conf)
    bucket_name = 'test-x001'
    localfile = './a.jpg'
    object_name = 'images/a.jpg'
    upload_image_to_minio(client, bucket_name, object_name, localfile, mime='image/jpeg')
    upload_image_to_minio(client, bucket_name, object_name, localfile)
    upload_data_to_minio(client, bucket_name, object_name, open(localfile, 'rb').read(), mime='image/jpeg')
25 浏览
11 爬虫
0 评论