DeepSeek
import os
import time
import random
import struct
import socket
import threading
# 初始化计数器(随机起始值)
_counter = random.randint(0, 0xFFFFFF)
_counter_lock = threading.Lock()
def generate_object_id():
"""生成符合MongoDB规范的ObjectId"""
global _counter
# 1. 4字节时间戳(秒)
timestamp = int(time.time())
# 2. 5字节机器+进程标识(前3字节机器哈希,后2字节PID)
machine_hash = abs(hash(socket.gethostname())) % 0xFFFFFF
pid = os.getpid() % 0xFFFF
# 3. 3字节自增计数器(线程安全)
with _counter_lock:
_counter = (_counter + 1) % 0xFFFFFF
counter = _counter
# 拼接所有部分
object_id = (
struct.pack(">I", timestamp)[1:] + # 4字节时间戳(去掉最高字节,实际用3字节)
struct.pack(">I", machine_hash)[1:] + # 3字节机器哈希
struct.pack(">H", pid) + # 2字节PID
struct.pack(">I", counter)[1:] # 3字节计数器
)
return object_id.hex() # 返回24字符的十六进制字符串
# 示例使用
if __name__ == "__main__":
print("生成的ObjectId:", generate_object_id())
# 示例输出: 662f5b87a8d4a61234567890 (24字符十六进制)