使用 python 连接 clickhouse

创建日期: 2023-12-19 19:51 | 作者: 风波 | 浏览次数: 21 | 分类: ClickHouse

安装 clickhouse-driver

文档:https://clickhouse-driver.readthedocs.io/en/latest/quickstart.html

使用 python 连接 clickhouse

from clickhouse_driver import Client

clickhouse_client = Client(host="localhost") # 端口默认 9090

执行 SQL 命令

res = client.execute(sql)

使用 sqlalchemy 连接 clickhouse

来源:https://blog.csdn.net/yycoolsam/article/details/90758321

基于SQLAlchemy连接clickhouse库(clickhouse-sqlalchemy/clickhouse_driver 插件) clickhouse-sqlalchemy的api网址: https://github.com/xzkostyan/clickhouse-sqlalchemy clickhouse-driver的api网址: https://github.com/mymarilyn/clickhouse-driver

from sqlalchemy import create_engine
import urllib.parse

host = '1.1.1.1'
user = 'default'
password = 'default'
db = 'test'
port = 8123 # http连接端口
engine = create_engine('clickhouse://{user}:{password}@{host}:{port}/{db}'
                .format(user = user,
                        host = host,
                        password = urllib.parse.quote_plus(password),
                        db = db,
                        port = port),
                pool_size = 30,max_overflow = 0,
                pool_pre_ping=True , pool_recycle= 3600)
port = 9000 # Tcp/Ip连接端口
engine1 = create_engine('clickhouse+native://{user}:{password}@{host}:{port}/{db}'
                .format(user = user,
                        host = host,
                        password = urllib.parse.quote_plus(password),
                        db = db,
                        port = port),
                pool_size = 30,max_overflow = 0,
                pool_pre_ping=True , pool_recycle= 3600)
# 2021-12-21
# https://github.com/xzkostyan/clickhouse-sqlalchemy/issues/129
# 其中由于我使用的sqlalchemy是1.4版本,而clickhouse-sqlalchemy本身的最新版本是仅支持1.3的。所以需要使用他们的测试版本来,截止到发文日期,暂未发现测试版本使用中出现重大异常情况。其安装地址为
# pip install git+https://github.com/xzkostyan/clickhouse-sqlalchemy@feature-sa-1.4#egg=clickhouse-sqlalchemy==0.2.0

# 由于clickhouse-sqlalchemy并非sqlalchemy的默认引擎,导致对应的pandas也无法直接通过生成的engine来发送数据(其实是可以发送数据的,但是无法自动建表)。故做了一个仿pandas的发送数据功能
# 参考文档https://github.com/xzkostyan/clickhouse-sqlalchemy
class ClickhouseDf(object):
    def __init__(self, **kwargs):
        self.engines_dict = {
            "MergeTree": engines.MergeTree,
            "AggregatingMergeTree": engines.AggregatingMergeTree,
            "GraphiteMergeTree": engines.GraphiteMergeTree,
            "CollapsingMergeTree": engines.CollapsingMergeTree,
            "VersionedCollapsingMergeTree": engines.VersionedCollapsingMergeTree,
            "SummingMergeTree": engines.SummingMergeTree,
            "ReplacingMergeTree": engines.ReplacingMergeTree,
            "Distributed": engines.Distributed,
            "ReplicatedMergeTree": engines.ReplicatedMergeTree,
            "ReplicatedAggregatingMergeTree": engines.ReplicatedAggregatingMergeTree,
            "ReplicatedCollapsingMergeTree": engines.ReplicatedCollapsingMergeTree,
            "ReplicatedVersionedCollapsingMergeTree": engines.ReplicatedVersionedCollapsingMergeTree,
            "ReplicatedReplacingMergeTree": engines.ReplicatedReplacingMergeTree,
            "ReplicatedSummingMergeTree": engines.ReplicatedSummingMergeTree,
            "View": engines.View,
            "MaterializedView": engines.MaterializedView,
            "Buffer": engines.Buffer,
            "TinyLog": engines.TinyLog,
            "Log": engines.Log,
            "Memory": engines.Memory,
            "Null": engines.Null,
            "File": engines.File
        }
        self.table_engine = kwargs.get("table_engine", "MergeTree")  # 默认引擎选择
        if self.table_engine not in self.engines_dict.keys():
            raise ValueError("No engine for this table")

    def _createORMTable(self, df, name, con, schema, **kwargs):
        col_dtype_dict = {
                "object": sqlalchemy.Text,
                "int64": sqlalchemy.Integer,
                "int32": sqlalchemy.Integer,
                "int16": sqlalchemy.Integer,
                "int8": sqlalchemy.Integer,
                "int": sqlalchemy.Integer,
                "float64": sqlalchemy.Float,
                "float32": sqlalchemy.Float,
                "float16": sqlalchemy.Float,
                "float8": sqlalchemy.Float,
                "float": sqlalchemy.Float,
            }
        primary_key = kwargs.get("primary_key", [])
        df_col = df.columns.tolist()
        metadata = MetaData(bind=con, schema=schema)

        _table_check_col = []
        for col in df_col:
            col_dtype = str(df.dtypes[col])
            if col_dtype not in col_dtype_dict.keys():
                if col in primary_key:
                    _table_check_col.append(Column(col, col_dtype_dict["object"], primary_key=True))
                else:
                    _table_check_col.append(Column(col, col_dtype_dict["object"]))
            else:
                if col in primary_key:
                    _table_check_col.append(Column(col, col_dtype_dict[col_dtype], primary_key=True))
                else:
                    _table_check_col.append(Column(col, col_dtype_dict[col_dtype]))
        _table_check = Table(name, metadata,
                        *_table_check_col,
                        self.engines_dict[self.table_engine](primary_key=primary_key)
                        )
        return _table_check


    def _checkTable(self, name, con, schema):
        sql_str = f"EXISTS {schema}.{name}"
        if con.execute(sql_str).fetchall() == [(0,)]:
            return 0
        else:
            return 1


    def to_sql(self, df, name: str, con, schema=None, if_exists="fail",**kwargs):
        '''
        将DataFrame格式数据插入Clickhouse中
        {'fail', 'replace', 'append'}, default 'fail'
        '''
        if self.table_engine in ["MergeTree"]:  # 表格必须有主键的引擎列表-暂时只用这种,其他未测试
            self.primary_key = kwargs.get("primary_key", [df.columns.tolist()[0]])
        else:
            self.primary_key = kwargs.get("primary_key", [])

        orm_table = self._createORMTable(df, name, con, schema, primary_key=self.primary_key)
        tanle_exeit = self._checkTable(name, con, schema)
        # 创建表
        if if_exists == "fail":
            if tanle_exeit:
                raise ValueError(f"table already exists :{name} ")
            else:
                orm_table.create()
        if if_exists == "replace":
            if tanle_exeit:
                orm_table.drop()
                orm_table.create()
            else:
                orm_table.create()
        if if_exists == "append":
            if not tanle_exeit:
                orm_table.create()

        # http连接下会自动将None填充为空字符串以入库,tcp/ip模式下则不会,会导致引擎报错,需要手动填充。
        df_dict = df.to_dict(orient="records") 
        con.execute(orm_table.insert(), df_dict)
        # df.to_sql(name, con, schema, index=False, if_exists="append")


# 使用方法
cdf = ClickhouseDf()
with engine.connect() as conn:
   cdf.to_sql(df, table_name, conn, schema_name, if_exists="append")
21 浏览
15 爬虫
0 评论