mongodb 官方 python 库中生成 ObjectId 的方式

创建日期: 2025-05-15 10:56 | 作者: 风波 | 浏览次数: 30 | 分类: Python
  1. 代码仓库:https://github.com/mongodb/mongo-python-driver

  2. ObjectId 类所在的文件 mongo-python-driver/bson/objectid.py

  3. 生成 ObjectId 的代码。 完整的 ObjectId 类还包括解析 objectid 的方法,这里只摘录生成 ObjectId 的部分

import binascii
import os
from random import SystemRandom
import struct
import threading
import time


_MAX_COUNTER_VALUE = 0xFFFFFF
_PACK_INT = struct.Struct(">I").pack
_PACK_INT_RANDOM = struct.Struct(">I5s").pack



def _random_bytes() -> bytes:
    """Get the 5-byte random field of an ObjectId."""
    return os.urandom(5)


class ObjectId:
    _inc = SystemRandom().randint(0, _MAX_COUNTER_VALUE)
    _inc_lock = threading.Lock()
    _pid = os.getpid()
    __random = _random_bytes()

    def __init__(self):
        self.__generate()

    def __str__(self) -> str:
        return binascii.hexlify(self.__id).decode()

    def __generate(self) -> None:
        with ObjectId._inc_lock:
            inc = ObjectId._inc
            ObjectId._inc = (inc + 1) % (_MAX_COUNTER_VALUE + 1)

        # 4 bytes current time, 5 bytes random, 3 bytes inc.
        self.__id = _PACK_INT_RANDOM(int(time.time()), ObjectId._random()) + _PACK_INT(inc)[1:4]

    @classmethod
    def _random(cls) -> bytes:
        """Generate a 5-byte random number once per process."""
        pid = os.getpid()
        if pid != cls._pid:
            cls._pid = pid
            cls.__random = _random_bytes()
        return cls.__random

def main():
    oid = ObjectId()
    print(str(oid))

if "__main__" == __name__:
    main()
30 浏览
14 爬虫
0 评论