安装 MinIO
pip install minio -i https://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com
设置安全策略 & 共享文件
from minio import Minio
from minio.commonconfig import ENABLED
from minio.versioningconfig import VersioningConfig
from minio.error import S3Error
import os
from loguru import logger
from datetime import timedelta
import json
def share_file_with_expire(client, bucket_name, object_name, expireday):
download_url = client.presigned_get_object(
bucket_name,
object_name,
expires=timedelta(days=expireday)
)
print(f"视频下载链接 ({expireday}天内有效): {download_url}")
def set_secure_policy(client, bucket_name):
# 启用版本控制(如果需要)
client.set_bucket_versioning(bucket_name, VersioningConfig(ENABLED))
policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": "*",
"Action": ["s3:GetObject"],
"Resource": [f"arn:aws:s3:::{bucket_name}/*"]
}
]
}
# 设置公开读取策略(仅测试用,生产环境谨慎)
client.set_bucket_policy(bucket_name, json.dumps(policy))
def get_minio_client(conf):
print(conf)
# 初始化 MinIO 客户端
client = Minio(
conf['server'], # MinIO 服务器地址
access_key=conf['user'],
secret_key=conf['passwd'],
secure=conf['secure'] # 是否使用 HTTPS
)
return client
if __name__ == "__main__":
conf = {
"server": os.getenv("MINIO_SERVER", "172.17.0.1:9000"),
"user": os.getenv("MINIO_USER", ""),
"passwd": os.getenv("MINIO_PASSWORD", ""),
"secure": True if os.getenv("MINIO_SECURE", None) else False
}
client = get_minio_client(conf)
bucket_name = 'test-x001'
object_name = 'images/a.jpg'
set_secure_policy(client, bucket_name)
share_file_with_expire(client, bucket_name, object_name, 7)