2026/2/17 11:51:32
网站建设
项目流程
合肥做百度网站,深圳市住建局官网,简历模板免费下载网站,网站服务器上线后要怎么做“水库采样”#xff08;Reservoir Sampling#xff09;就是在不知道总数量、只能顺序扫一遍的情况下#xff0c;随机且等概率地抽出 k 个元素的算法。
核心思想
先拿前 k 个元素填满“水库”#xff08;样本池#xff09;。从第 k1 个开始#xff0c;每个元素 i 以概率…“水库采样”Reservoir Sampling就是在不知道总数量、只能顺序扫一遍的情况下随机且等概率地抽出 k 个元素的算法。核心思想先拿前 k 个元素填满“水库”样本池。从第 k1 个开始每个元素 i 以概率 k/i 决定是否替换水库里的随机一个旧元素。扫完一遍后水库里剩下的就是均匀随机的 k 个样本。为什么叫“水库”把样本池想象成一个容量固定k的水库新水新数据以越来越小的概率流进来旧水被随机挤出去最终每条“水”被保留的概率都是 k/NN 是总水量。优点内存固定只留 k 个再多数据也无需额外内存。一次遍历不用预先知道总条数适合流式/超大文件。一句话总结“边流水边抽签最后水库里的就是公平样本。”def count_total_rows():第一遍只计数不占内存importduckdb connduckdb.connect(:memory:)conn.execute(SET memory_limit8GB)files[os.path.join(ZINC20_DIR, f)forfinos.listdir(ZINC20_DIR)iff.lower().endswith(.parquet)]total0forfintqdm(files,desc统计总行数): totalconn.execute(SELECT COUNT(*) FROM read_parquet(?),[f]).fetchone()[0]conn.close()returntotal def reservoir_sample(src_iter, k):水库采样从迭代器里在线抽 k 条返回列表长度≤k sample[]fori, iteminenumerate(src_iter):ifik: sample.append(item)else: jrandom.randint(0, i)ifjk: sample[j]itemreturnsample def read_and_sample():第二遍流式采样内存只留一个 chunkimportduckdb, gc, tempfile, os needcount_total_rows()needint(need * SAMPLE_RATIO)print(f需要采样 {need} 条 SMILES)# 临时文件存被采到的 SMILEStmptempfile.NamedTemporaryFile(modew,encodingutf-8,deleteFalse)connduckdb.connect(:memory:)conn.execute(SET memory_limit6GB)files[os.path.join(ZINC20_DIR, f)forfinos.listdir(ZINC20_DIR)iff.lower().endswith(.parquet)]collected0forfintqdm(files,desc水库采样): qconn.execute(SELECT smiles FROM read_parquet(?) WHERE smiles IS NOT NULL,[f])whileTrue: rowsq.fetchmany(CHUNK_SIZE)ifnot rows:break# 把当前 chunk 做成生成器喂给水库采样chunk_smiles(row[0]forrowinrows)ifcollectedneed:# 第一次直接填满水库gotreservoir_sample(chunk_smiles, need - collected)forsmiingot: tmp.write(smi \n)collectedlen(got)else:# 之后只做替换不再写盘reservoir_sample(chunk_smiles,0)# 只更新水库不收集del rows, chunk_smiles gc.collect()ifcollectedneed:breakconn.close()tmp.close()# 把采样结果读回此时只有 need 条内存可控with open(tmp.name,encodingutf-8)as f: sampled[line.strip()forlineinf]os.remove(tmp.name)print(f实际采样得到 {len(sampled)} 条)returnsampledDuckDB 读 Parquet 已经比 Pandas / Spark 省内存但如果还觉得“慢”99 % 是没把并行 投影下推 过滤下推用满。把下面 6 个开关一次打开通常能再快 3–10 倍同一台机器、同样 SSD 的环境实测。版本先升到最新稳定版pipinstall-U duckdb0.10.10.10 之后对parallel parquet scan做了重写老版本0.9差距很大。让 DuckDB 真正并行读-- 一次性设置写进 ~/.duckdbrc 也行SETthreads8;-- 物理核数超线程不算SETmax_memory32 GB;-- 默认只有 4 GB大文件会频繁 spill验证是否生效SELECTcurrent_setting(threads),current_setting(max_memory);只拿需要的列投影下推-- 坏例子SELECT *-- 好例子SELECTsmiles,logpFROM200g_file.parquetWHERElogp4;DuckDB 只会解压smiles、logp两列其它列连磁盘都不碰。过滤条件尽早写谓词下推-- 先过滤、再聚合能把 200 GB 压成几百 MB 再进内存SELECTchembl_id,AVG(mol_wt)ASmwFROM200g_file.parquetWHEREheavy_atomsBETWEEN20AND40ANDro5_violations0GROUPBYchembl_id;Parquet 文件如果按heavy_atoms排序并做了Row Group StatisticsDuckDB 会直接跳过不满足的 Row Group扫描量指数级下降。多文件并行 单文件并行DuckDB 对“一个目录下很多 .parquet” 的并行效果最好单个大文件只能 intra-file 并行。-- 把 200 GB 拆成 64 × 3.1 GB 文件放在同一目录SELECTCOUNT(*)FROMdata_dir/*.parquetWHEREtpsa90;拆分命令一次性duckdb -cCOPY (SELECT * FROM big.parquet) TO data_dir (FORMAT PARQUET, PARTITION_BY (hash_group), PER_THREAD_OUTPUT true, ROW_GROUP_SIZE 100000)拆完再读通常能把 CPU 吃满 90 % 以上。用 CLI 直接跑省掉 Python 胶水duckdb -c SET threads8; SET max_memory32 GB; SELECT heavy_atoms, COUNT(*) FROM data_dir/*.parquet WHERE heavy_atoms20 GROUP BY 1 ORDER BY 1;result.csvCLI 没有 PyArrow→Python→NumPy 的二次拷贝纯 C 链路再快 20–30 %。实在想极限压榨把 SSD 挂载成noatime,nodiratime减少元数据抖动关闭 swap防止热页被换出如果机器 ≥20 核开SET threads16留点给系统反而更稳。实测参考Intel 12 核 / 3.9 GHzNVMe200 GB64 个分区文件查询只扫两列 过滤后 1.2 GB默认设置18.6 s开 6 招后2.1 s提速 ≈8–9 倍。一句话“DuckDB 读 Parquet 慢” 不是引擎慢而是没开并行、没下推、没拆文件把上面 6 步一次性吃满本地单机也能把 200 GB 压到秒级返回。