sqlalchemy 创建 postgresql 的分区表 partition table

创建日期: 2025-01-17 19:45 | 作者: 风波 | 浏览次数: 18 | 分类: SQLAlchemy

参考一

参考二

# BIG NOTE: 
# I made this becuase I thought the method was interesting. It's not useful in it's current form. 
# If you can, use an index instead. https://stackoverflow.com/a/27895337

from sqlalchemy import Column, Integer, Float, ForeignKey, event, DDL
from sqlalchemy.schema import CreateSchema
from sqlalchemy.dialects.postgresql import DOUBLE_PRECISION
from sqlalchemy.ext.declarative import declarative_base, declared_attr

Base = declarative_base()

# Make sure you're using engine.create/engine.create_all to create the tables
schema_name = 'point_partitions'
def check_schema(ddl, target, bind, state, **kw):
  return not bind.dialect.has_schema(bind, schema=schema_name)

event.listen(Base.metadata, 'before_create',
  CreateSchema(schema_name).execute_if(callable_=check_schema)
)

Double = Float().with_variant(DOUBLE_PRECISION(), 'postgresql')

# The partitioned tables are attached after being created as described here:
# https://stackoverflow.com/questions/61545680/postgresql-partition-and-sqlalchemy
# To make this nice, we create:
#  - A mixin which has the properties of the table
#  - The base table to be partitioned
#  - Each table which is a partition separately in a for loop

class PointMixin:
  # Point group is what we are partitioning by
  @declared_attr
  def point_group_id(self):
    return Column(Integer, ForeignKey('point_group.point_group_id', ondelete='CASCADE'), primary_key=True)

  # We don't set a backref/back_populates, else it would conflict on each of the partition tables.

  t = Column(Double, nullable=False, primary_key=True)
  x = Column(Double, nullable=False)
  y = Column(Double, nullable=False)
  z = Column(Double, nullable=True)

class Point(PointMixin, Base):
  __tablename__ = 'point'
  __table_args__ = {
    'postgresql_partition_by': 'RANGE(point_group_id)',
  }

PARTITION_RANGE = 10
def PointX(point_group_id):
  gid_lo = point_group_id
  gid_hi = gid_lo + PARTITION_RANGE - 1
  nm = f'partition_{gid_lo:08d}_{gid_hi:08d}'
  # Using `type` to instantiate allows us to set the class name before the class is created
  # Trying to set __name__ after creating the class fails because the Base.metadata metaclass has already
  # registered the class by it's name after instantiation, replacing the previous partition table.
  # i.e. This DOESN'T work:
  # 
  #     class PointPartitionX(PointMixin, Base):
  #       __name__ = f'PointPartition{gid_lo}'
  #   
  # Because __name__ is set after we have declared PointPartitionX being a subclass of Base.
  PointPartitionX = type(
    f'PointPartition{gid_lo}',
    (PointMixin, Base),
    {
      '__tablename__': nm,
      '__table_args__': { 'schema': 'point_partitions' }
    }
  )

  PointPartitionX.__table__.add_is_dependent_on(Point.__table__)

  # This DDL only runs if the table is created. Thus this code is idempotent.
  event.listen(
    PointPartitionX.__table__,
    'after_create',
    DDL(f'ALTER TABLE point ATTACH PARTITION point_partitions.{nm} FOR VALUES FROM ({gid_lo}) TO ({gid_hi+1})')
  )

  return PointPartitionX


# We will create a fixed number of partitions ahead of time.
# This has the downside that if the database grows too large, we need to increase this number.
# But, dynamic partitions have their own problems. https://stackoverflow.com/a/7892660
for x in range(0, 100, PARTITION_RANGE):
  part = PointX(x)

# After this, you can just insert into the table like normal and Postgresql will allocate the rows to the partitions

参考三

from sqlalchemy import event

class MeasureMixin:
    city_id = Column(Integer, not_null=True)
    log_date = Column(Date, not_null=True)
    peaktemp = Column(Integer)
    unitsales = Column(Integer)

class Measure(MeasureMixin, Base):
    __tablename__ = 'measures'
    __table_args__ = {
        postgresql_partition_by: 'RANGE (log_date)'
    }

class Measure2020(MeasureMixin, Base):
    __tablename__ = 'measures2020'

Measure2020.__table__.add_is_dependent_on(Measure.__table__)

event.listen(
    Measure2020.__table__,
    "after_create",
    DDL("""ALTER TABLE measures ATTACH PARTITION measures2020
VALUES FROM ('2020-01-01') TO ('2021-01-01');""")
)

或者

from sqlalchemy.ext.declarative import DeclarativeMeta
from sqlalchemy.sql.ddl import DDL
from sqlalchemy import event


class PartitionByYearMeta(DeclarativeMeta):
    def __new__(cls, clsname, bases, attrs, *, partition_by):
        @classmethod
        def get_partition_name(cls_, key):
            # 'measures' -> 'measures_2020' (customise as needed)
            return f'{cls_.__tablename__}_{key}'

        @classmethod
        def create_partition(cls_, key):
            if key not in cls_.partitions:

                Partition = type(
                    f'{clsname}{key}', # Class name, only used internally
                    bases,
                    {'__tablename__': cls_.get_partition_name(key)}
                )

                Partition.__table__.add_is_dependent_on(cls_.__table__)

                event.listen(
                    Partition.__table__,
                    'after_create',
                    DDL(
                        # For non-year ranges, modify the FROM and TO below
                        f"""
                        ALTER TABLE {cls_.__tablename__}
                        ATTACH PARTITION {Partition.__tablename__}
                        FOR VALUES FROM ('{key}-01-01') TO ('{key+1}-01-01');
                        """
                    )
                )

                cls_.partitions[key] = Partition

            return cls_.partitions[key]

        attrs.update(
            {
                # For non-RANGE partitions, modify the `postgresql_partition_by` key below
                '__table_args__': attrs.get('__table_args__', ())
                + (dict(postgresql_partition_by=f'RANGE({partition_by})'),),
                'partitions': {},
                'partitioned_by': partition_by,
                'get_partition_name': get_partition_name,
                'create_partition': create_partition
            }
        )

        return super().__new__(cls, clsname, bases, attrs)
class MeasureMixin:
    # The columns need to be pulled out into this mixin
    # Note: any foreign key columns will need to be wrapped like this:

    @declared_attr
    def city_id(self):
        return Column(ForeignKey('cities.id'), not_null=True)

    log_date = Column(Date, not_null=True)
    peaktemp = Column(Integer)
    unitsales = Column(Integer)

class Measure(MeasureMixin, Base, metaclass=PartitionByYearMeta, partition_by='logdate'):
    __tablename__ = 'measures'
# Make sure you commit any session that is currently open, even for select queries:
session.commit()

Partition = Measure.create_partition(2020)
if not engine.dialect.has_table(Partition.__table__.name):
    Partition.__table__.create(bind=engine)

参考四

from sqlalchemy.ext.declarative import DeclarativeMeta
from sqlalchemy.sql.ddl import DDL
from sqlalchemy import event

class PartitionByMeta(DeclarativeMeta):
    def __new__(cls, clsname, bases, attrs, *, partition_by, partition_type):

        @classmethod
        def get_partition_name(cls_, suffix):
            return f'{cls_.__tablename__}_{suffix}'

        @classmethod
        def create_partition(cls_, suffix, partition_stmt, subpartition_by=None, subpartition_type=None):
            if suffix not in cls_.partitions:

                partition = PartitionByMeta(
                    f'{clsname}{suffix}',
                    bases,
                    {'__tablename__': cls_.get_partition_name(suffix)},
                    partition_type = subpartition_type,
                    partition_by=subpartition_by,
                )

                partition.__table__.add_is_dependent_on(cls_.__table__)

                event.listen(
                    partition.__table__,
                    'after_create',
                    DDL(
                        # For non-year ranges, modify the FROM and TO below
                        # LIST: IN ('first', 'second');
                        # RANGE: FROM ('{key}-01-01') TO ('{key+1}-01-01')
                        f"""
                        ALTER TABLE {cls_.__tablename__}
                        ATTACH PARTITION {partition.__tablename__}
                        {partition_stmt};
                        """
                    )
                )

                cls_.partitions[suffix] = partition

            return cls_.partitions[suffix]

        if partition_by is not None:
            attrs.update(
                {
                    '__table_args__': attrs.get('__table_args__', ())
                    + (dict(postgresql_partition_by=f'{partition_type.upper()}({partition_by})'),),
                    'partitions': {},
                    'partitioned_by': partition_by,
                    'get_partition_name': get_partition_name,
                    'create_partition': create_partition
                }
            )

        return super().__new__(cls, clsname, bases, attrs)
class VehicleData(VehicleDataMixin, Project, metaclass=PartitionByMeta, partition_by='timestamp',partition_type='RANGE'):
    __tablename__ = 'vehicle_data'
    __table_args__ = (
        Index('ts_ch_nod_idx', "timestamp", "nodeid", "channelid", postgresql_using='brin'),
        UniqueConstraint('timestamp','nodeid','channelid', name='ts_ch_nod_constr')
    )
    for y in range(2017, 2021): 
         # Creating tables for all known nodeids
        tbl_vehid_y = VehicleData.create_partition(
            f"{y}", partition_stmt=f"""FOR VALUES FROM ('{y}-01-01') TO ('{y+1}-01-01')""",
            subpartition_by='nodeid', subpartition_type='LIST'
        )

        for i in {3, 4, 7, 9}:
            # Creating all the years below these nodeids including a default partition
            tbl_vehid_y.create_partition(
                f"nid{i}", partition_stmt=f"""FOR VALUES IN ('{i}')"""
            )

        # Defaults (nodeid) per year partition
        tbl_vehid_y.create_partition("def", partition_stmt="DEFAULT")

   # Default to any other year than anticipated
   VehicleData.create_partition("def", partition_stmt="DEFAULT")
18 浏览
14 爬虫
0 评论