优先级: P0(直采供应商核心依赖) 开发周期: 2 周 依赖: supplier-adapter(供应商适配器)、inventory(库存快照)、worker(定时任务)
直采供应商(direct_procure)的价格和库存不是实时查询的,而是通过主动拉取或供应商推送两种方式预先同步到平台。本模块负责:
| 维度 | 直连 direct_connect | 直采 direct_procure |
|---|---|---|
| 查价 | 实时调供应商 API | 查本地 snapshot + Redis |
| 下单/取消/确认 | 调供应商 API | 同左,也调供应商 API |
| 价格来源 | 供应商 API 实时返回 | 拉取/推送预先同步 |
| 库存管理 | 不管理,有价即有房 | 可能管理,也可能有价即有房 |
| 数据时效 | 实时 | 取决于同步频率 |
供应商推送/平台拉取 → 解析数据 → 转标准格式 → 批量 UPSERT DB → 异步更新 Redis
不使用 MQ 中转的原因: - UPSERT 天然去重——同一数据点(supplier_code + hotel_id + room_type + stay_date)无论写入多少次,结果都是最新的 - 避免 MQ 堆积问题(旧消息堵住新消息) - 减少一层中转,链路更简单,大数据量下更可控
┌─────────────────┐ ┌──────────────┐ ┌─────────────┐ ┌───────┐
│ 数据来源 │────▶│ 标准格式转换 │────▶│ 批量UPSERT │────▶│ Redis │
│ 拉取 / 推送 │ │ (适配器层) │ │ PostgreSQL │ │ 异步 │
└─────────────────┘ └──────────────┘ └─────────────┘ └───────┘
INSERT ... ON CONFLICT (unique_key) DO UPDATE SET ...// SyncData 单条同步数据的标准格式
type SyncData struct {
SupplierCode string `json:"supplier_code"`
SupplierHotelID string `json:"supplier_hotel_id"`
RoomTypeCode string `json:"room_type_code"`
StayDate string `json:"stay_date"` // YYYY-MM-DD
GrossPrice float64 `json:"gross_price"` // 卖价(含加价)
NetPrice float64 `json:"net_price"` // 采购价
Currency string `json:"currency"` // CNY/USD/...
AvailableRooms *int `json:"available_rooms"` // NULL = 有价即有房
MealPlan string `json:"meal_plan"` // BB/HB/FB/...
DataHash string `json:"data_hash"` // 数据内容 hash,用于变更检测
SyncTimestamp int64 `json:"sync_timestamp"` // 供应商侧数据时间戳(毫秒)
}
| 值 | 含义 | 行为 |
|---|---|---|
NULL |
有价即有房 | 查价时始终返回,不限库存数 |
0 |
售罄 | 查价时不返回 |
> 0 |
有库存限制 | 查价时返回,扣库存用 |
services/supplier-adapter/
internal/
standard/ # 平台标准定义
types.go # SyncData、SyncRequest、SyncResponse
adapter.go # SupplierSyncAdapter 接口
registry.go # 适配器注册表
providers/ # 各供应商适配包(每个供应商一个目录)
demo/ # 示例供应商
adapter.go # 实现 SupplierSyncAdapter
parser.go # 原始数据 → SyncData 转换
client.go # 供应商 API 调用封装
webhook.go # Webhook 签名验证 + 解析
booking/ # Booking.com 适配
...
// SupplierSyncAdapter 供应商同步适配器接口
type SupplierSyncAdapter interface {
// Code 返回供应商编码
Code() string
// Pull 主动拉取指定酒店列表的价格/库存
// 返回标准格式的数据切片
Pull(ctx context.Context, req SyncRequest) ([]SyncData, error)
// ParsePush 解析供应商推送的数据
// 负责签名验证 + 格式转换
ParsePush(ctx context.Context, body []byte, headers map[string]string) ([]SyncData, error)
// HealthCheck 健康检查
HealthCheck(ctx context.Context) error
}
// SyncRequest 拉取请求
type SyncRequest struct {
SupplierCode string
HotelIDs []string // 供应商侧酒店 ID 列表
StayDateStart string // YYYY-MM-DD
StayDateEnd string // YYYY-MM-DD
PullFull bool // 是否全量拉取
}
providers/ 下新建供应商目录SupplierSyncAdapter 接口(parser + client + webhook)AdapterRegistry使用"滑动窗口 + 分片"方式调度:
定时扫描任务(每分钟)
↓
查询所有已匹配且 connection_mode = 'direct_procure' 的 supplier_hotels
↓
过滤:last_synced_at + sync_interval_minutes < now()
↓
按 supplier_code 分片,投递到 job queue
↓
consumer 按供应商并发拉取
flowchart TD
A[定时扫描任务] --> B{last_synced_at + interval < now?}
B -->|否| Z[跳过]
B -->|是| C[创建 Pull Job]
C --> D{Job Queue 有空闲 consumer?}
D -->|是| E[调用适配器 Pull]
D -->|否| F[等待下一轮]
E --> G{拉取成功?}
G -->|是| H[批量 UPSERT DB]
G -->|否| I[记录失败次数]
I --> J{连续失败 >= N?}
J -->|是| K[标记同步异常/暂停]
J -->|否| L[下次继续拉取]
H --> M[异步更新 Redis]
class B,D,G decision
class I,J,K error
class F warning
class Z skip
class A,C,E,H,L,M process
classDef process fill:#1a2744,stroke:#58a6ff,stroke-width:1px,color:#c9d1d9
classDef decision fill:#3a2a1a,stroke:#f0883e,stroke-width:2px,color:#f0883e
classDef done fill:#1a3a2a,stroke:#3fb950,stroke-width:1px,color:#3fb950
classDef error fill:#3a1a1a,stroke:#f85149,stroke-width:2px,color:#f85149
classDef warning fill:#3a2a1a,stroke:#d29922,stroke-width:1px,color:#d29922
classDef skip fill:#21262d,stroke:#484f58,color:#8b949e
POST /webhook/sync/{supplier_code}
supplier_code 路由到对应适配器api_credentials.webhook_secret 验证request_id 或数据 hash 做 Redis SET NX 去重,防止重复消费flowchart TD
A[供应商推送请求] --> B[URL 路由到适配器]
B --> C[适配器验证签名]
C --> D{签名有效?}
D -->|否| E[返回 401]
D -->|是| F[解析请求体 → SyncData]
F --> G{request_id 幂等检查}
G -->|已处理| H[返回 200 跳过]
G -->|未处理| I[标记 request_id 已处理]
I --> J[批量 UPSERT DB]
J --> K[异步更新 Redis]
K --> L[返回 200 成功]
class D,G decision
class L done
class H skip
class A,B,C,E,F,I,J,K process
classDef process fill:#1a2744,stroke:#58a6ff,stroke-width:1px,color:#c9d1d9
classDef decision fill:#3a2a1a,stroke:#f0883e,stroke-width:2px,color:#f0883e
classDef done fill:#1a3a2a,stroke:#3fb950,stroke-width:1px,color:#3fb950
classDef error fill:#3a1a1a,stroke:#f85149,stroke-width:2px,color:#f85149
classDef warning fill:#3a2a1a,stroke:#d29922,stroke-width:1px,color:#d29922
classDef skip fill:#21262d,stroke:#484f58,color:#8b949e
| 安全措施 | 说明 |
|---|---|
| 签名验证 | HMAC-SHA256,secret 存 api_credentials.webhook_secret |
| IP 白名单 | 可选,存 api_credentials.webhook_allowed_ips |
| 幂等处理 | request_id + Redis SET NX,TTL 24h |
| 限流 | 按供应商限流,防止恶意推送 |
Key 格式: snapshot:{supplier_code}:{supplier_hotel_id}:{room_type_code}:{stay_date}
Value: JSON(SyncData)
TTL: 与 sync_validity_minutes 对齐
| 场景 | 处理方式 |
|---|---|
| 同步写入 DB 后更新 Redis | 异步更新,允许短暂不一致(毫秒级) |
| 取消订单释放库存 | 同步更新 Redis(保证可订性实时准确) |
| Redis 宕机 | 降级查 DB,不影响核心功能 |
| 缓存过期 | TTL 自动清除,查价时 cache miss 回填 |
数据过期条件: now - last_synced_at > sync_validity_minutes
sync_validity_minutes 配置在 supplier_hotels 表上,默认 120 分钟-- supplier_hotels 表新增字段
ALTER TABLE supplier_hotels ADD COLUMN connection_mode VARCHAR(32) DEFAULT 'direct_connect';
ALTER TABLE supplier_hotels ADD COLUMN sync_interval_minutes INT DEFAULT 120;
ALTER TABLE supplier_hotels ADD COLUMN sync_validity_minutes INT DEFAULT 120;
ALTER TABLE supplier_hotels ADD COLUMN sync_max_failures INT DEFAULT 3;
ALTER TABLE supplier_hotels ADD COLUMN sync_suspend_threshold INT DEFAULT 10;
ALTER TABLE supplier_hotels ADD COLUMN sync_status VARCHAR(20) DEFAULT 'normal';
ALTER TABLE supplier_hotels ADD COLUMN sync_fail_count INT DEFAULT 0;
ALTER TABLE supplier_hotels ADD COLUMN last_synced_at TIMESTAMPTZ;
注意: connection_mode 从 suppliers 表迁移到 supplier_hotels 表,支持同一供应商不同酒店使用不同连接模式。
| 阶段 | 条件 | 行为 | QE 影响 |
|---|---|---|---|
| 忽略 | 单次拉取失败 | 记录日志,继续用旧数据 | 无影响 |
| 降级 | 连续失败 >= sync_max_failures | 标记 sync_status = 'degraded',降低查询优先级 | 仍返回但标注可能过期 |
| 暂停 | 连续失败 >= sync_suspend_threshold | 标记 sync_status = 'suspended',停止返回 | 查价不返回该供应商 |
| 恢复 | 拉取成功 | 重置 sync_fail_count,sync_status = 'normal' | 正常返回 |
推送失败(供应商推送了但平台没收到)通过拉取对账发现: - 每次拉取时,比较供应商返回的数据时间戳和本地的 last_synced_at - 如果供应商有更新数据但我们没有,说明推送有遗漏 - 记录对账日志,自动通过拉取补全
// DirectProcureAdapter 直采供应商查价适配器
// 实现 SupplierAdapter 接口,查本地 snapshot 而非远程 API
type DirectProcureAdapter struct {
inventoryClient *InventoryClient // inventory 服务客户端
redisClient *redis.Client // Redis 缓存
syncValidity int // 价格有效期(分钟)
}
func (a *DirectProcureAdapter) SearchRates(ctx context.Context, req SearchRequest) ([]PriceOption, error) {
// 1. 查 Redis 缓存
// 2. Cache miss → 查 inventory 服务(DB)
// 3. 检查数据是否过期(now - last_synced_at > sync_validity)
// 4. 过期则跳过不返回
// 5. 转换为 PriceOption 格式返回
}
QE 注册 supplier 时,根据 supplier_hotels 的 connection_mode 选择 adapter:
- direct_connect → HTTPSupplier(远程 API)
- direct_procure → DirectProcureAdapter(本地 snapshot + Redis)
| 场景 | available_rooms 行为 |
|---|---|
| 有库存数(> 0) | 立即释放:UPDATE SET available_rooms = available_rooms + room_count |
| 有价即有房(NULL) | 无需操作 |
| 直连供应商 | 不涉及本地库存 |
使用原子 UPDATE 语句:
UPDATE inventory_snapshots
SET available_rooms = available_rooms + $1,
updated_at = NOW()
WHERE supplier_code = $2
AND supplier_hotel_id = $3
AND room_type_code = $4
AND stay_date = $5;
同时同步更新 Redis:INCRBY 对应 key。