当
search_after遇上高频 refresh,你的分页还稳得住吗?ElasticSearch 的
search_afterAPI 是深度分页的首选方案,但它有一个鲜为人知的"阿喀琉斯之踵"——在高频 refresh 场景下,排序可能变得不稳定,导致翻页出现重复或遗漏。本文将深入剖析这一问题的根因,并引出search_after+Point-in-Time (PIT)的完整解决方案,最后结合 QueryBuilder 的代码实现,梳理从接口设计到底层查询的完整链路。
一、ElasticSearch 的分页困境
1.1 传统 from + size 的天花板
ElasticSearch 最基础的分页方式是 from + size:
GET /my_index/_search
{
"from": 10000,
"size": 20,
"query": { "match_all": {} },
"sort": [{ "created_at": "desc" }]
}这种方式的问题众所周知——深度分页性能急剧下降。当 from 值很大时,ES 需要在每个分片上取出 from + size 条文档,然后在协调节点上做全局排序后丢弃前 from 条。当 from = 10000, size = 20 时,每个分片实际要返回 10020 条文档给协调节点。
ES 默认将 from + size 的上限设为 10000(index.max_result_window),超过即报错。
1.2 search_after:基于游标的深度分页
search_after API 是 ES 官方推荐的深度分页方案。它的核心思想是:不再使用偏移量,而是使用上一页最后一条文档的排序值作为"游标",直接定位到下一页的起始位置。
// 第一页
GET /my_index/_search
{
"size": 20,
"query": { "match_all": {} },
"sort": [
{ "created_at": "desc" },
{ "_id": "asc" }
]
}
// 第二页:使用第一页最后一条的 sort values
GET /my_index/_search
{
"size": 20,
"query": { "match_all": {} },
"sort": [
{ "created_at": "desc" },
{ "_id": "asc" }
],
"search_after": ["2026-05-17T10:30:00Z", "doc_999"]
}优势:
- 无论翻到多深,每次查询的开销都是恒定的(只取
size条) - 不受
max_result_window限制
但是,search_after 有一个关键前提——它假设两次请求之间,索引的数据和排序是稳定的。
二、高频 refresh 下的排序不稳定问题
2.1 问题根因
ElasticSearch 是一个近实时(Near Real-Time)搜索引擎。写入的文档不会立即可搜索,而是在 refresh 操作后才对搜索可见。默认的 refresh 间隔是 1 秒(index.refresh_interval)。
当你使用 search_after 进行跨请求分页时,两次请求之间可能发生以下情况:
- 新文档被 refresh 进来:如果新文档的排序值落在已翻过的页和当前页之间,会导致后续页的文档整体"后移",出现重复数据
- 旧文档被删除或更新:文档排序值变化后,可能导致某些文档被跳过
- 段合并(Segment Merge):后台的段合并可能改变文档的物理存储顺序,影响
_doc等隐式排序
用一张时间线来说明:
请求1(第1页) 请求2(第2页)
│ │
▼ ▼
┌─────────┐ refresh 发生 ┌─────────┐
│ 文档A │ ──────────► │ 文档X(新)│ ← 新文档插入
│ 文档B │ │ 文档A │ ← 文档A 重复出现!
│ 文档C │ │ 文档B │
│ ... │ │ ... │
└─────────┘ └─────────┘2.2 影响场景
这个问题在以下场景中尤为突出:
| 场景 | 影响程度 | 说明 |
|---|---|---|
| 数据导出 | 🔴 严重 | 全量遍历时数据重复或遗漏,导致导出结果不完整 |
| 后台管理列表翻页 | 🟡 中等 | 用户翻页时看到重复数据,体验差 |
| 移动端"加载更多" | 🟡 中等 | 下拉加载时出现已看过的内容 |
| 实时数据流消费 | 🔴 严重 | 消费位点不准确,可能重复处理或丢失数据 |
2.3 为什么 _id 排序也救不了?
有人可能会想:加一个唯一的 _id 作为 tiebreaker 排序字段不就行了?
确实,_id 可以保证排序的唯一性,但它无法阻止新文档插入到已翻过的区间。search_after 的语义是"给我排序值大于这个游标的文档"——如果 refresh 后出现了新的符合条件的文档,它们就会出现在结果中。
根本问题不在排序的唯一性,而在于查询视图的不一致性。
三、Point-in-Time (PIT):冻结索引快照
3.1 PIT 是什么?
Point-in-Time (PIT) 是 ES 7.10+ 引入的特性,它允许你创建一个索引的轻量级快照。在 PIT 的生命周期内,所有搜索请求都基于同一个快照执行——即使期间发生了 refresh、文档更新或段合并,查询结果都不会受影响。
PIT 创建
│
▼
┌──────────────────────────────────────┐
│ 索引快照(冻结视图) │
│ │
│ 请求1 ──► 基于快照查询 ──► 结果1 │
│ 请求2 ──► 基于快照查询 ──► 结果2 │ ← 数据一致!
│ 请求3 ──► 基于快照查询 ──► 结果3 │
│ │
└──────────────────────────────────────┘
│
PIT 关闭3.2 PIT 的工作原理
- 创建 PIT:调用
POST /my_index/_pit?keep_alive=1m,ES 返回一个pit_id - 使用 PIT 查询:在搜索请求中携带
pit_id,此时不需要指定索引名(PIT 已绑定索引) - 续期 PIT:每次查询时通过
keep_alive参数续期,ES 返回可能更新的pit_id - 关闭 PIT:遍历完成后主动关闭,释放 ES 资源
3.3 search_after + PIT = 完美分页
将 search_after 与 PIT 结合,就能实现既高效又一致的深度分页:
// 1. 创建 PIT
POST /my_index/_pit?keep_alive=1m
// 返回: { "id": "46ToAwMDaWR5BXV1..." }
// 2. 第一页查询(携带 PIT)
GET /_search
{
"size": 20,
"query": { "match_all": {} },
"pit": {
"id": "46ToAwMDaWR5BXV1...",
"keep_alive": "1m"
},
"sort": [
{ "created_at": "desc" },
{ "_id": "asc" }
]
}
// 3. 第二页查询(search_after + PIT)
GET /_search
{
"size": 20,
"query": { "match_all": {} },
"pit": {
"id": "46ToAwMDaWR5BXV1...", // 使用上一次返回的 pit_id
"keep_alive": "1m"
},
"sort": [
{ "created_at": "desc" },
{ "_id": "asc" }
],
"search_after": ["2026-05-17T10:30:00Z", "doc_999"]
}
// 4. 遍历完成后关闭 PIT
DELETE /_pit
{ "id": "46ToAwMDaWR5BXV1..." }四、QueryBuilder 中的实现:代码链路全梳理
QueryBuilder 是一个支持多数据源的 Go 查询构建器库。在最新版本中,ElasticSearchBuilder 已全面引入 search_after + PIT 方案。下面我们从接口设计到底层执行,逐层梳理整个链路。
4.1 新增的数据结构与字段
首先,ElasticSearchBuilder 新增了 pitKeepAlive 和 pitID 两个字段,分别用于控制 PIT 保活时长和支持跨请求的 PIT 分页:
type ElasticSearchBuilder[R any] struct {
builder[*ElasticSearchBuilder[R], R]
index string
filter elastic.Query
sort []elastic.Sorter
pitKeepAlive time.Duration // 新增:Point-in-Time 保持时间
pitID string // 新增:外部传入/内部更新的 PIT ID(用于跨请求分页)
}同时新增了 ESPITPageResult 结构体,作为 PIT 分页查询的返回结果:
// ESPITPageResult ES PIT 分页查询结果。
// PitID 与 CursorValues 可用于下一批查询。
type ESPITPageResult[R any] struct {
List []*R // 当前页数据
Total int64 // 总数(仅首页查询时返回)
HasMore bool // 是否还有下一页
PitID string // 下一页使用的 PIT ID
CursorValues []any // 下一页使用的游标值
}以及新增的错误定义:
// ErrPITCursorWithoutPITID ElasticSearch 单批次分页查询模式下未提供 PIT ID 的错误
ErrPITCursorWithoutPITID = errors.New("PIT ID is required when cursor values are provided")这个错误用于防御性校验——如果调用方提供了 cursorValues(说明不是第一页),但没有提供 pitID,说明 PIT 会话已丢失,此时应该拒绝查询而非静默返回错误数据。
4.2 新增 API:SetPitKeepAlive、SetPITID 与 QueryPageWithPIT
SetPitKeepAlive:设置 PIT 的保活时长,控制 ES 快照的存活窗口:
func (e *ElasticSearchBuilder[R]) SetPitKeepAlive(keepAlive time.Duration) *ElasticSearchBuilder[R] {
e.pitKeepAlive = keepAlive
return e
}如果不设置,默认使用 1 分钟(由 pitKeepAliveString() 内部兜底)。对于耗时较长的遍历或翻页间隔较大的场景,建议适当调大此值(如 2 * time.Minute),避免 PIT 在两次请求之间过期。
SetPITID:允许外部传入 PIT ID,用于跨请求续查:
func (e *ElasticSearchBuilder[R]) SetPITID(pitID string) *ElasticSearchBuilder[R] {
e.pitID = pitID
return e
}QueryPageWithPIT:核心新增方法,执行基于 PIT + search_after 的单批次分页查询:
func (e *ElasticSearchBuilder[R]) QueryPageWithPIT(ctx context.Context) (*ESPITPageResult[R], error) {
if err := e.builder.prepareAndValidate(); err != nil {
return nil, err
}
if e.index == "" {
return nil, errors.New("elasticsearch index not configured")
}
// 防御性校验:有游标值但没有 PIT ID,说明会话已丢失
if len(e.builder.cursorValues) > 0 && e.pitID == "" {
return nil, ErrPITCursorWithoutPITID
}
// 临时覆盖分页标志,确保 QueryMeta 和执行语义一致
oldNeedPagination := e.builder.needPagination
oldIsCursorQuery := e.builder.isCursorQuery
oldPitID := e.pitID
defer func() {
e.builder.needPagination = oldNeedPagination
e.builder.isCursorQuery = oldIsCursorQuery
e.pitID = oldPitID
}()
e.builder.needPagination = true
e.builder.isCursorQuery = true
isFirstBatch := len(e.builder.cursorValues) == 0
var (
nextCursorValues []any
hasMore bool
resultPitID string
)
// 通过中间件管道执行查询
list, total, err := e.builder.executeWithMiddlewares(ctx, func(ctx context.Context) ([]*R, int64, error) {
batchList, batchNextCursorValues, batchTotal, batchHasMore, queryErr := e.doCursorQuery(
ctx, e.builder.cursorValues, isFirstBatch, true, &e.pitID,
)
if queryErr != nil {
return nil, 0, queryErr
}
nextCursorValues = batchNextCursorValues
hasMore = batchHasMore
resultPitID = e.pitID
return batchList, batchTotal, nil
})
if err != nil {
return nil, err
}
result := &ESPITPageResult[R]{
List: list,
Total: total,
HasMore: hasMore,
PitID: resultPitID,
CursorValues: nextCursorValues,
}
// 最后一页:清空 PIT 信息,提示调用方无需继续
if !hasMore {
result.PitID = ""
result.CursorValues = nil
}
return result, nil
}设计要点:
- 临时覆盖 + defer 恢复:
QueryPageWithPIT临时将needPagination和isCursorQuery设为true,确保QueryMeta中的元信息与实际执行语义一致(中间件可以正确感知这是一次游标分页查询),查询完成后通过defer恢复原值,不影响构建器的后续使用。 - 中间件管道复用:通过
executeWithMiddlewares执行,意味着缓存、日志、指标等中间件对 PIT 查询同样生效。 - 最后一页自动清理:当
hasMore为false时,自动清空返回结果中的PitID和CursorValues,向调用方明确传达"没有下一页了"的语义。
4.3 底层引擎重构:doCursorQuery 的演进
doCursorQuery 是整个游标分页的核心执行引擎。在引入 PIT 之前,它是一个纯粹的 search_after 查询函数,不涉及任何 PIT 管理。这次重构涉及签名变更、PIT 管理策略和 hasMore 判断逻辑三个维度。
签名变更
// 旧版(无 PIT 支持)
func (e *ElasticSearchBuilder[R]) doCursorQuery(
ctx context.Context, cursorValues []any, isFirstBatch bool,
) ([]*R, []any, int64, error)
// 新版(PIT + hasMore)
func (e *ElasticSearchBuilder[R]) doCursorQuery(
ctx context.Context, cursorValues []any,
isFirstBatch, forcePIT bool,
pitID *string,
) (list []*R, nextCursorValues []any, total int64, hasMore bool, err error)三个关键变化:
- 新增
pitID *string参数:用于在多批次查询间传递和更新 PIT ID - 新增
forcePIT参数:区分"全量遍历自动 PIT"和"跨请求强制 PIT"两种场景 - 新增
hasMore返回值:用于QueryPageWithPIT判断是否还有下一页
从直连索引到 PIT 快照
旧版 doCursorQuery 直接通过索引名查询,没有任何快照保障:
// 旧版:直接使用 Index 查询,无 PIT 支持
searchService := e.builder.data.ElasticSearch.Search().
Index(e.index).
Query(filter).
Size(batchSize)新版引入了 forcePIT 和 usePIT 的双层控制:
querySize := batchSize
if forcePIT {
// PIT 分页场景通过多取 1 条记录来判断是否还有下一页
querySize = batchSize + 1
}
usePIT := forcePIT || !e.builder.needPaginationforcePIT = true(来自QueryPageWithPIT):强制使用 PIT,并多取 1 条用于hasMore探测!needPagination(来自QueryCursor全量遍历):自动启用 PIT 以保证遍历一致性- 两者都为
false时,退化为旧版行为——直接使用Index(e.index)查询
PIT 生命周期管理
usePIT := forcePIT || !e.builder.needPagination
openedPITByThisCall := false
defer func() {
// 如果本次调用打开了 PIT 但查询失败,主动关闭以避免资源泄漏
if err != nil && openedPITByThisCall && pitID != nil && *pitID != "" {
e.closePIT(*pitID)
*pitID = ""
}
}()
if usePIT {
if pitID == nil {
return nil, nil, 0, false, errors.New("pitID pointer is nil")
}
if *pitID == "" {
// 首次调用:自动打开 PIT
openResp, err := e.builder.data.ElasticSearch.OpenPointInTime(e.index).
KeepAlive(e.pitKeepAliveString()).Do(ctx)
if err != nil {
return nil, nil, 0, false, err
}
*pitID = openResp.Id
openedPITByThisCall = true
}
// 使用 PIT 查询时不指定索引名(PIT 已绑定索引)
searchService = searchService.PointInTime(
elastic.NewPointInTimeWithKeepAlive(*pitID, e.pitKeepAliveString()),
)
} else {
searchService = searchService.Index(e.index)
}关键设计:
openedPITByThisCall标记用于错误回滚——如果是本次调用打开的 PIT,但后续查询失败了,需要主动关闭以避免 ES 资源泄漏- PIT 查询时不指定索引名,因为 PIT 已经绑定了索引
hasMore 探测与结果裁剪
hits := searchResult.Hits.Hits
hasMore = forcePIT && len(hits) > batchSize
effectiveHits := hits
if hasMore {
effectiveHits = hits[:batchSize] // 裁掉多取的那 1 条
}
list = make([]*R, 0, len(effectiveHits))
for _, hit := range effectiveHits {
var item R
if err := json.Unmarshal(hit.Source, &item); err != nil {
return nil, nil, 0, false, err
}
list = append(list, &item)
}这是一个经典的 "多取一条"探测模式:请求 batchSize + 1 条数据,如果实际返回了 batchSize + 1 条,说明还有下一页;否则当前就是最后一页。返回给调用方时裁掉多取的那一条。
最后一页自动关闭 PIT
if forcePIT && !hasMore && *pitID != "" {
e.closePIT(*pitID)
*pitID = ""
}当 forcePIT 模式下检测到最后一页时,自动关闭 PIT 释放 ES 资源。这确保了即使调用方忘记手动关闭,PIT 也不会一直占用资源。
4.4 新增 closePIT:PIT 资源释放
旧版 ElasticSearchBuilder 没有 PIT 相关逻辑,自然也没有 closePIT 方法。新版新增了这个方法,负责在 PIT 不再需要时主动释放 ES 资源:
const esPITCloseTimeout = 3 * time.Second
func (e *ElasticSearchBuilder[R]) closePIT(pitID string) {
if pitID == "" {
return
}
closeCtx, cancel := context.WithTimeout(context.Background(), esPITCloseTimeout)
defer cancel()
_, _ = e.builder.data.ElasticSearch.ClosePointInTime(pitID).Do(closeCtx)
}设计要点:
- 超时时间提取为常量
esPITCloseTimeout,提高可维护性 - 使用独立的
context.Background()创建超时上下文,避免受调用方 context 取消的影响 - 忽略关闭错误(
_, _),因为 PIT 关闭失败不应影响业务流程,ES 会在keep_alive过期后自动回收 closePIT的调用时机由上层精确控制:QueryCursor通过defer在迭代结束后关闭,doCursorQuery在最后一页自动关闭,错误时通过openedPITByThisCall标记回滚
4.5 QueryCursor 的重构
旧版 QueryCursor 直接将 doCursorQuery 作为回调传入,因为两者签名完全匹配:
// 旧版:签名匹配,直接传入
func (e *ElasticSearchBuilder[R]) QueryCursor(ctx context.Context) iter.Seq2[*R, error] {
// ...
return e.builder.executeCursorWithMiddlewares(ctx, e.doCursorQuery)
}新版由于 doCursorQuery 签名变更(新增 forcePIT、pitID、hasMore),需要通过包装函数适配,同时引入 PIT 生命周期管理:
// 新版:包装适配 + PIT 生命周期管理
func (e *ElasticSearchBuilder[R]) QueryCursor(ctx context.Context) iter.Seq2[*R, error] {
// ...
var pitID string
wrappedCursorQuery := func(ctx context.Context, cursorValues []any, isFirstBatch bool) ([]*R, []any, int64, error) {
// forcePIT = false:由 needPagination 控制是否启用 PIT
list, nextCursorValues, total, _, err := e.doCursorQuery(ctx, cursorValues, isFirstBatch, false, &pitID)
return list, nextCursorValues, total, err
}
innerIter := e.builder.executeCursorWithMiddlewares(ctx, wrappedCursorQuery)
return func(yield func(*R, error) bool) {
defer func() {
e.closePIT(pitID) // 迭代结束后关闭 PIT
}()
for item, err := range innerIter {
if !yield(item, err) {
return
}
}
}
}关键变化:
- 通过
wrappedCursorQuery闭包适配新签名,传入forcePIT = false和局部pitID变量 hasMore返回值用_丢弃,因为全量遍历模式下,是否继续由"本批次是否返回了数据"决定- 新增
defer e.closePIT(pitID),确保迭代结束(正常完成或提前break)后自动释放 PIT 资源
4.6 结果解析的重构:从回调内解析到统一后处理
旧版 doCursorQuery 在搜索回调内部直接解析 hits 并追加到 list:
// 旧版:在搜索回调内部解析
if err := util.WaitAndGo(func() error {
searchResult, err = searchService.Do(ctx)
if err != nil {
return err
}
// 解析与搜索耦合在同一个回调中
for _, hit := range searchResult.Hits.Hits {
var item R
if err := json.Unmarshal(hit.Source, &item); err != nil {
return err
}
list = append(list, &item)
}
return nil
}, func() error {
// Count 查询...
})新版将搜索和解析分离——回调只负责执行搜索和更新 PIT ID,hits 的解析和裁剪在并行任务完成后统一处理:
// 新版:搜索与解析分离
if err = util.WaitAndGo(func() error {
searchResult, err = searchService.Do(ctx)
if err != nil {
return err
}
// 只更新 PIT ID,不解析 hits
if usePIT && *pitID != "" && searchResult.PitId != "" {
*pitID = searchResult.PitId
}
return nil
}, func() error {
// Count 查询...
})
// 并行任务完成后,统一处理 hits
hits := searchResult.Hits.Hits
hasMore = forcePIT && len(hits) > batchSize
effectiveHits := hits
if hasMore {
effectiveHits = hits[:batchSize] // 裁掉探测用的多余 1 条
}
list = make([]*R, 0, len(effectiveHits))
for _, hit := range effectiveHits {
var item R
if err := json.Unmarshal(hit.Source, &item); err != nil {
return nil, nil, 0, false, err
}
list = append(list, &item)
}这种分离设计使得 hasMore 探测和结果裁剪逻辑可以在解析前完成,避免先解析再丢弃的浪费。
此外,Count 查询的触发条件也做了修正。旧版有一个额外的 e.afterHook == nil 条件:
// 旧版
if !isFirstBatch || !e.builder.needTotal || e.afterHook == nil {
return nil
}新版移除了这个条件:
// 新版
if !isFirstBatch || !e.builder.needTotal {
return nil
}这是因为 needTotal 本身就是控制是否查询总数的开关,不应该与 afterHook 耦合。即使没有设置 AfterQueryHook,如果用户明确要求了 needTotal,总数也应该被查询并返回。
4.7 Clone 的完善
Clone 方法也同步拷贝了新增的 pitID 字段:
func (e *ElasticSearchBuilder[R]) Clone() *ElasticSearchBuilder[R] {
cloned := &ElasticSearchBuilder[R]{
index: e.index,
filter: e.filter,
pitKeepAlive: e.pitKeepAlive, // 新增:拷贝 PIT 保活时长
pitID: e.pitID, // 新增:拷贝 PIT ID
}
// ...
}4.8 完整调用链路图
┌─────────────────────────────────────────────────────────────────┐
│ 业务层 │
│ │
│ 场景A: 全量遍历(数据导出) 场景B: 跨请求分页(API 翻页) │
│ ┌──────────────────────┐ ┌──────────────────────────┐ │
│ │ b.SetNeedPagination │ │ b.SetPITID(prevPitID) │ │
│ │ (false) │ │ b.SetCursorValue(...) │ │
│ │ b.QueryCursor(ctx) │ │ b.QueryPageWithPIT(ctx) │ │
│ └──────────┬───────────┘ └──────────┬───────────────┘ │
└─────────────┼──────────────────────────────────┼────────────────┘
│ │
▼ ▼
┌─────────────────────────┐ ┌──────────────────────────────────┐
│ QueryCursor │ │ QueryPageWithPIT │
│ │ │ │
│ forcePIT = false │ │ forcePIT = true │
│ usePIT = !needPaginate │ │ usePIT = true (强制) │
│ hasMore: 忽略 │ │ hasMore: 多取1条探测 │
│ PIT关闭: defer │ │ PIT关闭: 最后一页自动关闭 │
└──────────┬──────────────┘ └──────────┬───────────────────────┘
│ │
└──────────┬───────────────────┘
▼
┌────────────────────────┐
│ doCursorQuery │
│ │
│ 1. 按需打开 PIT │
│ 2. 构建 search_after │
│ 3. 并行: 查询 + Count │
│ 4. 解析 hits │
│ 5. 提取 sort values │
│ 6. 按需关闭 PIT │
└────────────────────────┘五、使用示例
5.1 全量遍历(数据导出场景)
b := builder.NewElasticSearchBuilder[Doc](
builder.NewDBProxy(nil, nil, esClient), "my_index",
)
b.SetFilter(elastic.NewTermQuery("status", "active"))
b.SetCursorField("created_at")
b.SetSort(elastic.NewFieldSort("_id").Asc())
b.SetLimit(500)
b.SetNeedPagination(false) // 关闭分页 → 自动启用 PIT
b.SetPitKeepAlive(2 * time.Minute) // PIT 保活时长
for doc, err := range b.QueryCursor(ctx) {
if err != nil {
log.Printf("遍历错误: %v", err)
break
}
export(doc)
}
// PIT 在迭代结束后自动关闭(defer)5.2 跨请求分页(API 翻页场景)
// 首页请求
es := builder.NewElasticSearchBuilder[Doc](
builder.NewDBProxy(nil, nil, esClient), "my_index",
)
es.SetFilter(elastic.NewMatchAllQuery()).
SetCursorField("created_at", "id").
SetLimit(20)
page, err := es.QueryPageWithPIT(ctx)
// 持久化 page.PitID + page.CursorValues → 编码为 page_token 返回给客户端
// 下一页请求:从 page_token 中解码出 pitID 和 cursorValues
es.SetPITID(prevPitID).SetCursorValue(prevCursorValues...)
page, err = es.QueryPageWithPIT(ctx)
// 最后一页:page.HasMore == false, page.PitID == ""
// PIT 已自动关闭,无需手动处理后端 API 业务层数据组装参考
在实际的后端 API 中,QueryPageWithPIT 返回的 ESPITPageResult 需要组装为前后端约定的分页协议。推荐的接口契约如下:
请求参数:
| 字段 | 类型 | 说明 |
|---|---|---|
page_size | integer | 每页条数 |
page_token | string | 不透明的分页令牌(首页为空) |
响应参数:
| 字段 | 类型 | 说明 |
|---|---|---|
items | array | 当前页数据 |
next_page_token | string | 下一页令牌(无更多数据时为空) |
has_more | boolean | 是否还有下一页 |
page_token 生成策略
page_token 本质上是对 pit_id + cursor_values 的序列化封装。推荐的生成流程:
- 构建载荷:
{"pit_id":"...","cursor_values":[...],"exp":...,"v":1} - 序列化并编码:JSON 序列化后进行 Base64URL 编码
- 完整性保护:根据安全需求添加 HMAC 签名或 AES-GCM 加密
- 请求时校验:每次请求解码
page_token后,校验版本号、过期时间、签名,通过后再调用SetPITID+SetCursorValue
// page_token 载荷结构示例
type PageToken struct {
PitID string `json:"pit_id"`
CursorValues []any `json:"cursor_values"`
Exp int64 `json:"exp"` // 过期时间戳
Version int `json:"v"` // 版本号,用于兼容性控制
}
// 编码:ESPITPageResult → page_token
func EncodePageToken(result *builder.ESPITPageResult[Doc]) string {
if !result.HasMore {
return "" // 最后一页,无需生成 token
}
token := PageToken{
PitID: result.PitID,
CursorValues: result.CursorValues,
Exp: time.Now().Add(5 * time.Minute).Unix(),
Version: 1,
}
data, _ := json.Marshal(token)
// 实际生产中应添加 HMAC 签名或加密
return base64.RawURLEncoding.EncodeToString(data)
}
// 解码:page_token → pitID + cursorValues
func DecodePageToken(tokenStr string) (*PageToken, error) {
data, err := base64.RawURLEncoding.DecodeString(tokenStr)
if err != nil {
return nil, fmt.Errorf("invalid page token: %w", err)
}
var token PageToken
if err := json.Unmarshal(data, &token); err != nil {
return nil, fmt.Errorf("invalid page token payload: %w", err)
}
// 校验版本号和过期时间
if token.Version != 1 {
return nil, fmt.Errorf("unsupported page token version: %d", token.Version)
}
if time.Now().Unix() > token.Exp {
return nil, fmt.Errorf("page token expired")
}
return &token, nil
}集成使用建议:
- PIT 有
keep_alive窗口,如果 PIT 过期或失效,应从首页重新开始并创建新的 PIT - 保持稳定的排序键(例如:业务时间戳 + 唯一 ID),使
search_after的结果具有确定性 HasMore通过limit+1探测计算得出,可作为分页提示,但仍应以返回的 cursor/token 作为真实依据
六、总结
| 方案 | 一致性 | 深度分页 | 资源开销 | 适用场景 |
|---|---|---|---|---|
from + size | ❌ 不一致 | ❌ 有上限 | 高(深度越大越高) | 浅层分页 |
search_after | ❌ 不一致 | ✅ 无上限 | 低(恒定) | 单次请求内的游标遍历 |
search_after + PIT | ✅ 一致 | ✅ 无上限 | 低(恒定 + PIT 快照) | 跨请求分页、数据导出 |
search_after + PIT 方案已在 QueryBuilder 中完整实现,提供了两种使用模式:
QueryCursor:全量遍历迭代器,SetNeedPagination(false)时自动启用 PIT,适用于数据导出等批处理场景QueryPageWithPIT:单批次 PIT 分页查询,返回ESPITPageResult(含PitID、CursorValues、HasMore),适用于跨请求的 API 翻页场景
核心设计决策:
forcePIT双层控制:通过forcePIT参数区分"自动 PIT"和"强制 PIT"两种场景,复用同一个doCursorQuery处理- 多取一条探测:
QueryPageWithPIT通过请求batchSize + 1条数据来判断hasMore,避免额外的 Count 查询 - PIT 生命周期自动管理:错误时回滚、最后一页自动关闭、迭代结束时 defer 关闭——三重保障防止资源泄漏
- 中间件管道复用:
QueryPageWithPIT通过executeWithMiddlewares执行,日志、指标、缓存等中间件无缝生效
一句话总结:search_after 解决了深度分页的性能问题,PIT 解决了跨请求的一致性问题——两者结合,才是 ElasticSearch 分页的终极方案。