← 返回文档中心

价格同步模块(Price Sync)

优先级: P0(直采供应商核心依赖) 开发周期: 2 周 依赖: supplier-adapter(供应商适配器)、inventory(库存快照)、worker(定时任务)


1. 概述

直采供应商(direct_procure)的价格和库存不是实时查询的,而是通过主动拉取供应商推送两种方式预先同步到平台。本模块负责:

1.1 核心原则

  1. 兼容性优先:兼容"有价即有房"(available_rooms=NULL)和"有库存数"两种模式
  2. 大数据量设计:所有操作必须支持万级供应商酒店 × 百日级日期范围的并发处理
  3. 最终一致:同步数据允许短暂延迟(秒级),不需要强一致
  4. 天然去重:使用 DB UPSERT 语义,不依赖 MQ 去重

1.2 与直连供应商的关系

维度 直连 direct_connect 直采 direct_procure
查价 实时调供应商 API 查本地 snapshot + Redis
下单/取消/确认 调供应商 API 同左,也调供应商 API
价格来源 供应商 API 实时返回 拉取/推送预先同步
库存管理 不管理,有价即有房 可能管理,也可能有价即有房
数据时效 实时 取决于同步频率

2. 数据写入方案

2.1 方案选型:直接写 DB + 异步更新缓存

供应商推送/平台拉取 → 解析数据 → 转标准格式 → 批量 UPSERT DB → 异步更新 Redis

不使用 MQ 中转的原因: - UPSERT 天然去重——同一数据点(supplier_code + hotel_id + room_type + stay_date)无论写入多少次,结果都是最新的 - 避免 MQ 堆积问题(旧消息堵住新消息) - 减少一层中转,链路更简单,大数据量下更可控

2.2 写入流程

┌─────────────────┐     ┌──────────────┐     ┌─────────────┐     ┌───────┐
│  数据来源        │────▶│  标准格式转换  │────▶│  批量UPSERT  │────▶│ Redis │
│  拉取 / 推送     │     │  (适配器层)   │     │  PostgreSQL  │     │ 异步  │
└─────────────────┘     └──────────────┘     └─────────────┘     └───────┘

2.3 批量 UPSERT 优化


3. 标准数据格式

3.1 平台标准结构

// SyncData 单条同步数据的标准格式
type SyncData struct {
    SupplierCode    string   `json:"supplier_code"`
    SupplierHotelID string   `json:"supplier_hotel_id"`
    RoomTypeCode    string   `json:"room_type_code"`
    StayDate        string   `json:"stay_date"`          // YYYY-MM-DD
    GrossPrice      float64  `json:"gross_price"`        // 卖价(含加价)
    NetPrice        float64  `json:"net_price"`          // 采购价
    Currency        string   `json:"currency"`           // CNY/USD/...
    AvailableRooms  *int     `json:"available_rooms"`    // NULL = 有价即有房
    MealPlan        string   `json:"meal_plan"`          // BB/HB/FB/...
    DataHash        string   `json:"data_hash"`          // 数据内容 hash,用于变更检测
    SyncTimestamp   int64    `json:"sync_timestamp"`     // 供应商侧数据时间戳(毫秒)
}

3.2 available_rooms 语义

含义 行为
NULL 有价即有房 查价时始终返回,不限库存数
0 售罄 查价时不返回
> 0 有库存限制 查价时返回,扣库存用

4. 供应商适配器架构

4.1 目录结构

services/supplier-adapter/
  internal/
    standard/              # 平台标准定义
      types.go             # SyncData、SyncRequest、SyncResponse
      adapter.go           # SupplierSyncAdapter 接口
      registry.go          # 适配器注册表
    providers/             # 各供应商适配包(每个供应商一个目录)
      demo/                # 示例供应商
        adapter.go         # 实现 SupplierSyncAdapter
        parser.go          # 原始数据 → SyncData 转换
        client.go          # 供应商 API 调用封装
        webhook.go         # Webhook 签名验证 + 解析
      booking/             # Booking.com 适配
        ...

4.2 标准接口定义

// SupplierSyncAdapter 供应商同步适配器接口
type SupplierSyncAdapter interface {
    // Code 返回供应商编码
    Code() string

    // Pull 主动拉取指定酒店列表的价格/库存
    // 返回标准格式的数据切片
    Pull(ctx context.Context, req SyncRequest) ([]SyncData, error)

    // ParsePush 解析供应商推送的数据
    // 负责签名验证 + 格式转换
    ParsePush(ctx context.Context, body []byte, headers map[string]string) ([]SyncData, error)

    // HealthCheck 健康检查
    HealthCheck(ctx context.Context) error
}

// SyncRequest 拉取请求
type SyncRequest struct {
    SupplierCode    string
    HotelIDs        []string   // 供应商侧酒店 ID 列表
    StayDateStart   string     // YYYY-MM-DD
    StayDateEnd     string     // YYYY-MM-DD
    PullFull        bool       // 是否全量拉取
}

4.3 新增供应商流程

  1. providers/ 下新建供应商目录
  2. 实现 SupplierSyncAdapter 接口(parser + client + webhook)
  3. 注册到 AdapterRegistry
  4. 配置拉取参数(sync_interval_minutes、sync_validity_minutes 等)

5. 拉取机制(Pull)

5.1 拉取调度

使用"滑动窗口 + 分片"方式调度:

定时扫描任务(每分钟)
    ↓
查询所有已匹配且 connection_mode = 'direct_procure' 的 supplier_hotels
    ↓
过滤:last_synced_at + sync_interval_minutes < now()
    ↓
按 supplier_code 分片,投递到 job queue
    ↓
consumer 按供应商并发拉取

5.2 拉取粒度

5.3 拉取流程

flowchart TD
    A[定时扫描任务] --> B{last_synced_at + interval < now?}
    B -->|否| Z[跳过]
    B -->|是| C[创建 Pull Job]
    C --> D{Job Queue 有空闲 consumer?}
    D -->|是| E[调用适配器 Pull]
    D -->|否| F[等待下一轮]
    E --> G{拉取成功?}
    G -->|是| H[批量 UPSERT DB]
    G -->|否| I[记录失败次数]
    I --> J{连续失败 >= N?}
    J -->|是| K[标记同步异常/暂停]
    J -->|否| L[下次继续拉取]
    H --> M[异步更新 Redis]
    class B,D,G decision
    class I,J,K error
    class F warning
    class Z skip
    class A,C,E,H,L,M process
    classDef process fill:#1a2744,stroke:#58a6ff,stroke-width:1px,color:#c9d1d9
    classDef decision fill:#3a2a1a,stroke:#f0883e,stroke-width:2px,color:#f0883e
    classDef done fill:#1a3a2a,stroke:#3fb950,stroke-width:1px,color:#3fb950
    classDef error fill:#3a1a1a,stroke:#f85149,stroke-width:2px,color:#f85149
    classDef warning fill:#3a2a1a,stroke:#d29922,stroke-width:1px,color:#d29922
    classDef skip fill:#21262d,stroke:#484f58,color:#8b949e

6. 推送机制(Push)

6.1 Webhook 接收

POST /webhook/sync/{supplier_code}

6.2 推送处理流程

flowchart TD
    A[供应商推送请求] --> B[URL 路由到适配器]
    B --> C[适配器验证签名]
    C --> D{签名有效?}
    D -->|否| E[返回 401]
    D -->|是| F[解析请求体 → SyncData]
    F --> G{request_id 幂等检查}
    G -->|已处理| H[返回 200 跳过]
    G -->|未处理| I[标记 request_id 已处理]
    I --> J[批量 UPSERT DB]
    J --> K[异步更新 Redis]
    K --> L[返回 200 成功]
    class D,G decision
    class L done
    class H skip
    class A,B,C,E,F,I,J,K process
    classDef process fill:#1a2744,stroke:#58a6ff,stroke-width:1px,color:#c9d1d9
    classDef decision fill:#3a2a1a,stroke:#f0883e,stroke-width:2px,color:#f0883e
    classDef done fill:#1a3a2a,stroke:#3fb950,stroke-width:1px,color:#3fb950
    classDef error fill:#3a1a1a,stroke:#f85149,stroke-width:2px,color:#f85149
    classDef warning fill:#3a2a1a,stroke:#d29922,stroke-width:1px,color:#d29922
    classDef skip fill:#21262d,stroke:#484f58,color:#8b949e

6.3 推送安全

安全措施 说明
签名验证 HMAC-SHA256,secret 存 api_credentials.webhook_secret
IP 白名单 可选,存 api_credentials.webhook_allowed_ips
幂等处理 request_id + Redis SET NX,TTL 24h
限流 按供应商限流,防止恶意推送

7. 缓存策略

7.1 Redis 缓存设计

Key 格式:   snapshot:{supplier_code}:{supplier_hotel_id}:{room_type_code}:{stay_date}
Value:      JSON(SyncData)
TTL:        与 sync_validity_minutes 对齐

7.2 缓存更新

7.3 缓存一致性

场景 处理方式
同步写入 DB 后更新 Redis 异步更新,允许短暂不一致(毫秒级)
取消订单释放库存 同步更新 Redis(保证可订性实时准确)
Redis 宕机 降级查 DB,不影响核心功能
缓存过期 TTL 自动清除,查价时 cache miss 回填

8. 价格有效期管理

8.1 过期判断

数据过期条件: now - last_synced_at > sync_validity_minutes

8.2 新增表字段

-- supplier_hotels 表新增字段
ALTER TABLE supplier_hotels ADD COLUMN connection_mode VARCHAR(32) DEFAULT 'direct_connect';
ALTER TABLE supplier_hotels ADD COLUMN sync_interval_minutes INT DEFAULT 120;
ALTER TABLE supplier_hotels ADD COLUMN sync_validity_minutes INT DEFAULT 120;
ALTER TABLE supplier_hotels ADD COLUMN sync_max_failures INT DEFAULT 3;
ALTER TABLE supplier_hotels ADD COLUMN sync_suspend_threshold INT DEFAULT 10;
ALTER TABLE supplier_hotels ADD COLUMN sync_status VARCHAR(20) DEFAULT 'normal';
ALTER TABLE supplier_hotels ADD COLUMN sync_fail_count INT DEFAULT 0;
ALTER TABLE supplier_hotels ADD COLUMN last_synced_at TIMESTAMPTZ;

注意: connection_mode 从 suppliers 表迁移到 supplier_hotels 表,支持同一供应商不同酒店使用不同连接模式。


9. 同步失败处理

9.1 失败分级

阶段 条件 行为 QE 影响
忽略 单次拉取失败 记录日志,继续用旧数据 无影响
降级 连续失败 >= sync_max_failures 标记 sync_status = 'degraded',降低查询优先级 仍返回但标注可能过期
暂停 连续失败 >= sync_suspend_threshold 标记 sync_status = 'suspended',停止返回 查价不返回该供应商
恢复 拉取成功 重置 sync_fail_count,sync_status = 'normal' 正常返回

9.2 推送失败检测

推送失败(供应商推送了但平台没收到)通过拉取对账发现: - 每次拉取时,比较供应商返回的数据时间戳和本地的 last_synced_at - 如果供应商有更新数据但我们没有,说明推送有遗漏 - 记录对账日志,自动通过拉取补全


10. QE 直采查价链路

10.1 DirectProcureAdapter

// DirectProcureAdapter 直采供应商查价适配器
// 实现 SupplierAdapter 接口,查本地 snapshot 而非远程 API
type DirectProcureAdapter struct {
    inventoryClient *InventoryClient  // inventory 服务客户端
    redisClient     *redis.Client     // Redis 缓存
    syncValidity    int               // 价格有效期(分钟)
}

func (a *DirectProcureAdapter) SearchRates(ctx context.Context, req SearchRequest) ([]PriceOption, error) {
    // 1. 查 Redis 缓存
    // 2. Cache miss → 查 inventory 服务(DB)
    // 3. 检查数据是否过期(now - last_synced_at > sync_validity)
    // 4. 过期则跳过不返回
    // 5. 转换为 PriceOption 格式返回
}

10.2 查价路由

QE 注册 supplier 时,根据 supplier_hotels 的 connection_mode 选择 adapter: - direct_connect → HTTPSupplier(远程 API) - direct_procure → DirectProcureAdapter(本地 snapshot + Redis)


11. 订单库存释放

11.1 取消订单时库存处理

场景 available_rooms 行为
有库存数(> 0) 立即释放:UPDATE SET available_rooms = available_rooms + room_count
有价即有房(NULL) 无需操作
直连供应商 不涉及本地库存

11.2 并发安全

使用原子 UPDATE 语句:

UPDATE inventory_snapshots 
SET available_rooms = available_rooms + $1,
    updated_at = NOW()
WHERE supplier_code = $2 
  AND supplier_hotel_id = $3 
  AND room_type_code = $4 
  AND stay_date = $5;

同时同步更新 Redis:INCRBY 对应 key。