python 遍历 minio bucket 文件

创建日期: 2025-05-23 17:19 | 作者: 风波 | 浏览次数: 25 | 分类: Python

遍历 bucket 文件

from minio import Minio
from minio.error import S3Error
from minio.commonconfig import ENABLED
from minio.versioningconfig import VersioningConfig
import os
from loguru import logger
import json


def list_bucket_object(client, bucket_name, prefix=None, recursive=False, start_after=None):
    def object_detail(object_name):
        try:
            obj_info = client.stat_object(bucket_name, object_name)

            print(f"    文件名: {obj_info.object_name}")
            print(f"    大小: {obj_info.size} 字节")
            print(f"    最后修改时间: {obj_info.last_modified}")
            print(f"    ETag: {obj_info.etag}")
            print(f"    Content-Type: {obj_info.content_type}")
            print(f"    元数据: {obj_info.metadata}\n")
            print('    ' + "="*50)
        except Exception as e:
            logger.error(e)

    try:
        # 获取 bucket 中的对象列表
        objects = client.list_objects(bucket_name, prefix=prefix, recursive=recursive, start_after=start_after)

        print(f"Bucket '{bucket_name}' 中的文件列表:")
        for obj in objects:
            print(f"- {obj.object_name} (大小: {obj.size} 字节, 最后修改时间: {obj.last_modified})")
            object_detail(obj.object_name)

    except S3Error as e:
        print(f"获取文件列表失败: {e}")
    print("--------------------------------")


def get_minio_client(conf):
    print(conf)
    # 初始化 MinIO 客户端
    client = Minio(
        conf['server'],  # MinIO 服务器地址
        access_key=conf['user'],
        secret_key=conf['passwd'],
        secure=conf['secure']  # 是否使用 HTTPS
    )
    return client


if __name__ == "__main__":
    conf = {
            "server": os.getenv("MINIO_SERVER", "172.17.0.1:9000"),
            "user": os.getenv("MINIO_USER", ""),
            "passwd": os.getenv("MINIO_PASSWORD", ""),
            "secure": True if os.getenv("MINIO_SECURE", None) else False
            }
    client = get_minio_client(conf)
    bucket_name = 'test-x001'
    list_bucket_object(client, bucket_name)
    list_bucket_object(client, bucket_name, recursive=True)
    list_bucket_object(client, bucket_name, prefix="images/", recursive=True)
    list_bucket_object(client, bucket_name, prefix="ebooks/", recursive=True)
25 浏览
14 爬虫
0 评论