-
代码仓库:https://github.com/mongodb/mongo-python-driver
-
ObjectId 类所在的文件
mongo-python-driver/bson/objectid.py
-
生成 ObjectId 的代码。 完整的 ObjectId 类还包括解析 objectid 的方法,这里只摘录生成 ObjectId 的部分
import binascii
import os
from random import SystemRandom
import struct
import threading
import time
_MAX_COUNTER_VALUE = 0xFFFFFF
_PACK_INT = struct.Struct(">I").pack
_PACK_INT_RANDOM = struct.Struct(">I5s").pack
def _random_bytes() -> bytes:
"""Get the 5-byte random field of an ObjectId."""
return os.urandom(5)
class ObjectId:
_inc = SystemRandom().randint(0, _MAX_COUNTER_VALUE)
_inc_lock = threading.Lock()
_pid = os.getpid()
__random = _random_bytes()
def __init__(self):
self.__generate()
def __str__(self) -> str:
return binascii.hexlify(self.__id).decode()
def __generate(self) -> None:
with ObjectId._inc_lock:
inc = ObjectId._inc
ObjectId._inc = (inc + 1) % (_MAX_COUNTER_VALUE + 1)
# 4 bytes current time, 5 bytes random, 3 bytes inc.
self.__id = _PACK_INT_RANDOM(int(time.time()), ObjectId._random()) + _PACK_INT(inc)[1:4]
@classmethod
def _random(cls) -> bytes:
"""Generate a 5-byte random number once per process."""
pid = os.getpid()
if pid != cls._pid:
cls._pid = pid
cls.__random = _random_bytes()
return cls.__random
def main():
oid = ObjectId()
print(str(oid))
if "__main__" == __name__:
main()