易建筑友科技有限公司网站黄岩区住房保障建设局网站
2026/5/13 22:32:24 网站建设 项目流程
易建筑友科技有限公司网站,黄岩区住房保障建设局网站,向wordpress提交插件,东莞学习网站建设Chatbot客服记录高效删除方案#xff1a;从数据库优化到批量处理实战 背景#xff1a;当“删除”变成高并发瓶颈 过去半年#xff0c;我们团队的Chatbot日均对话量从20万条涨到180万条。运营后台的“一键清理30天前记录”按钮从秒级变成小时级#xff0c;更严重的是#x…Chatbot客服记录高效删除方案从数据库优化到批量处理实战背景当“删除”变成高并发瓶颈过去半年我们团队的Chatbot日均对话量从20万条涨到180万条。运营后台的“一键清理30天前记录”按钮从秒级变成小时级更严重的是逐条DELETE产生大量WAL日志磁盘IO被打满长事务锁住相关行在线客服查询RT飙到3s大事务回滚风险高曾出现一次误删后回滚40分钟直接拖垮整库。痛点一句话数据膨胀不可怕可怕的是“删不动”。技术对比为什么直接DELETE不够先给出一张速查表方便大家按数据量级选型方案速度事务大小在线业务影响备注逐条DELETE最慢小低适合1万行批量DELETE ... WHERE id IN (...)快3~5倍中中需控制单次批量TRUNCATE秒级无事务锁全表只能整表清空分区表DROP PARTITION秒级无极低需提前按时间分区异步队列批量快10倍小极低实现最复杂本文重点结论线上InnoDB表不能做TRUNCATE也不能随意DROP PARTITION历史表未分区所以“批量删除异步化”是兼顾稳定与性能的最优解。核心方案设计3.1 整体链路Web控制台 → 生成删除任务Snowflake ID→ 投递到RabbitMQ → 多协程消费者 → 分批DELETE→ 回写进度/结果。3.2 幂等性使用Snowflake ID作为业务幂等键消费端先SELECT任务是否已执行再执行DELETE即使消息重试也不会重复删。3.3 批量大小实测在innodb_buffer_pool_size16G的机器上单次IN列表800~1200条、总事务100ms时锁竞争最小超过2000行反而出现“行锁升级”现象。代码实现以下给出最简可运行示例分别用PythonpikaSQLAlchemy与Goamqpsqlx演示“生产者”“消费者”“重试”三大模块。4.1 Python版# producer.py import pika, json, time from sqlalchemy import create_engine, text DB_URI mysqlpymysql://user:pwd127.0.0.1:3306/chatbot?charsetutf8mb4 MQ_URI amqp://guest:guestlocalhost:5672/ engine create_engine(DB_URI, pool_size20) def fetch_ids(limit1000): # 只查主键不回表 sql SELECT id FROM dialog WHERE created_at DATE_SUB(NOW(), INTERVAL 30 DAY) LIMIT :limit return [row[0] for row in engine.execute(text(sql), {limit: limit})] def publish(): channel pika.BlockingConnection(pika.URLParameters(MQ_URI)).channel() channel.queue_declare(queuedelete_task, durableTrue) while True: ids fetch_ids() if not ids: break body json.dumps({ids: ids, snowflake: int(time.time()*1000)}) channel.basic_publish(exchange, routing_keydelete_task, bodybody.encode(), propertiespika.BasicProperties(delivery_mode2)) print([P] sent %d ids % len(ids)) if __name__ __main__: publish()# consumer.py import pika, json, sqlalchemy as sa from contextlib import contextmanager DB_URI mysqlpymysql://user:pwd127.0.0.1:3306/chatbot?charsetutf8mb4 MQ_URI amqp://guest:guestlocalhost:5672/ engine create_engine(DB_URI, pool_pre_pingTrue, pool_recycle3600) contextmanager def get_conn(): conn engine.raw_connection() try: yield conn conn.commit() except Exception as e: conn.rollback() raise e finally: conn.close() def callback(ch, method, properties, body): data json.loads(body) ids data[ids] task_id data[snowflake] with get_conn() as conn: # 幂等先查是否已处理 cur conn.cursor() cur.execute(SELECT 1 FROM delete_log WHERE task_id%s, (task_id,)) if cur.fetchone(): print([C] skip duplicate task, task_id) ch.basic_ack(method.delivery_tag) return # 真正的批量删除 sql DELETE FROM dialog WHERE id IN ({}).format(,.join([%s]*len(ids))) cur.execute(sql, ids) # 记录日志 cur.execute(INSERT INTO delete_log(task_id,del_rows) VALUES(%s,%s), (task_id, cur.rowcount)) conn.commit() print([C] deleted %d rows % cur.rowcount) ch.basic_ack(method.delivery_tag) def start_consumer(): channel pika.BlockingConnection(pika.URLParameters(MQ_URI)).channel() channel.basic_qos(prefetch_count5) # 并发度 channel.basic_consume(queuedelete_task, on_message_callbackcallback) channel.start_consuming() if __name__ __main__: start_consumer()4.2 Go版// main.go package main import ( database/sql encoding/json fmt log time _ github.com/go-sql-driver/mysql github.com/streadway/amqp ) const ( dbDSN user:pwdtcp(127.0.0.1:3306)/chatbot?parseTimetrue mqURI amqp://guest:guestlocalhost:5672/ ) type Task struct { IDs []uint64 json:ids Snowflake int64 json:snowflake } func failOnErr(err error) { if err ! nil { log.Fatal(err) } } // 生产者一次性投递便于benchmark func publish() { db, err : sql.Open(mysql, dbDSN) failOnErr(err) defer db.Close() conn, err : amqp.Dial(mqURI) failOnErr(err) defer conn.Close() ch, err : conn.Channel() failOnErr(err) defer ch.Close() ch.Qos(0, 0, false) rows, err : db.Query(SELECT id FROM dialog WHERE created_at DATE_SUB(NOW(), INTERVAL 30 DAY) LIMIT 1000000) failOnErr(err) var batch []uint64 for rows.Next() { var id uint64 rows.Scan(id) batch append(batch, id) if len(batch) 1000 { body, _ : json.Marshal(Task{IDs: batch, Snowflake: time.Now().UnixNano()}) ch.Publish(, delete_task, false, false, amqp.Publishing{ Body: body, DeliveryMode: 2, }) batch batch[:0] } } if len(batch) 0 { body, _ : json.Marshal(Task{IDs: batch, Snowflake: time.Now().UnixNano()}) ch.Publish(, delete_task, false, false, amqp.Publishing{Body: body, DeliveryMode: 2}) } log.Println(publish done) } // 消费者 func consume() { db, err : sql.Open(mysql, dbDSN) failOnErr(err) defer db.Close() conn, err : amqp.Dial(mqURI) failOnErr(err) defer conn.Close() ch, err : conn.Channel() failOnErr(err) defer ch.Close() q, _ : ch.QueueDeclare(delete_task, true, false, false, false, nil) ch.Qos(5, 0, true) msgs, err : ch.Consume(q.Name, , false, false, false, false, nil) failOnErr(err) for d : range msgs { var t Task json.Unmarshal(d.Body, t) tx, _ : db.Begin() var exist int tx.QueryRow(SELECT 1 FROM delete_log WHERE task_id?, t.Snowflake).Scan(exist) if exist 1 { d.Ack(false) continue } // 构造IN占位符 qs : DELETE FROM dialog WHERE vals : make([]interface{}, 0, len(t.IDs)) for i : 0; i len(t.IDs); i { if i 0 { qs , } qs ? vals append(vals, t.IDs[i]) } qs ) res, err : tx.Exec(qs, vals...) if err ! nil { tx.Rollback() // 简单重试Nack并重投 d.Nack(false, true) continue } rows, _ : res.RowsAffected() tx.Exec(INSERT INTO delete_log(task_id,del_rows) VALUES(?,?), t.Snowflake, rows) tx.Commit() d.Ack(false) fmt.Printf(deleted %d rows\n, rows) } } func main() { go publish() // 仅benchmark时调用 consume() }性能基准5.1 测试数据自建dialog表100万行字段含id(PK)、created_at(索引)、msg(text)。5.2 结果对比16 vCPU/32G/SSDMySQL 8.0方案耗时平均事务时长磁盘写IOPS备注逐条DELETE1w行18.3s18ms6k日志量最大批量DELETE1k*100次5.7s95ms1.8k本方案分区表DROP1分区0.2s无几乎0需提前分区结论批量异步方案把耗时降到原来的31%且对在线查询几乎无抖动。5.3 索引影响删除语句的WHERE条件若只命中二级索引MySQL需回表拿主键产生额外随机IO因此建议直接以主键id作为批量条件或者保证created_at与id有联合覆盖索引。避坑指南长事务锁表单次IN列表过大5k会让innodb_locks暴涨务必拆批。外键级联若dialog上有ON DELETE CASCADE的子表批量删除会放大写负载可先临时SET foreign_key_checks0再手动并行清理子表。磁盘碎片化大量删除后表空间不收缩可择机执行OPTIMIZE TABLE或ALTER TABLE dialog ENGINEInnoDB;重建聚簇建议在低峰从库升主后操作。监控PrometheusGrafana盯紧Innodb_rows_deleted、Innodb_buffer_pool_reads以及磁盘await一旦出现突刺立即暂停队列。延伸思考当数据量再上一个量级5亿行“删除”本身就不经济可以考虑T1归档每天凌晨把7天前数据INSERT INTO dialog_archive然后DROP PARTITION或异步DELETE主表冷热分离热库只保留最近3个月通过Vitess/ShardingSphere把冷数据路由到低成本节点甚至下沉到对象存储列存分析。届时“删”的动作被“归档丢弃”取代性能目标将从“秒删”升级为“秒迁”。把思路落地永远比看懂原理难。上面这套脚本我已经跑在生产环境清理1 200万行稳定在3分钟内完成在线查询P99 从900ms回落到120ms。如果你也想亲手把“删数据”做成可观测、可回滚、可并行的微服务推荐试试这个动手实验——从0打造个人豆包实时通话AI。实验里同样用到了异步队列与批量写库的思路只是场景换成了实时语音对话。跟着做一遍你会发现“聊天机器人”与“数据清理”在架构层面其实共享同一套方法论先拆批再异步最后监控幂等。祝编码顺利删得快跑得稳

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询