%%{init: {'theme': 'light', 'themeVariables': { 'primaryTextColor': '#45567e', 'labelBoxBorderColor': '#45567e', 'messageTextColor': '#45567e, 'sequenceNumberColor': '#45567e', 'activationBkgColor': '#444', 'activationBorderColor': '#45567e' }} }%% sequenceDiagram actor user as 用户 user->>直播接口: 用户访问直播间 直播接口->>Kafka: pv统计日志写入消息队列 flink->>Kafka: 消费消息队列的pv日志 flink->>flink: 根据直播间和状态统计pv数据 flink->>mysql: 将统计数据回写到mysql 直播核心->>mysql: 定时任务获取到flink统计数据 直播核心->>直播核心: 将flink的统计数据进一步汇总到结果表

1、日志生成与采集

  • 日志格式

    //日志格式
    CountingLog|livecount|subType|status|uid|liveId|count
    //示例
    CountingLog|livecount|pv|preLive|201003011099|202301022091110099|1
    
  • 字段说明

    • subType:pv / 点赞 / 关注 / 评论 / 订阅 / 取消订阅
    • status:preLive(预告) / live(直播) / vod(回放)
    • count:增量值,值为1
  • 日志采集

    • 通过Kafka 收集日志,确保高吞吐与可靠性。
    • 用户调用进房接口时直接写入Kafka,避免同步阻塞。

2、Flink实时统计链路

目标:消费日志并实时聚合到MySQL的live_statistics_raw表。

2.1、数据流处理

  • 状态管理:使用Flink RocksDB状态后端,支持海量Key状态存储。
  • 延迟处理:允许1小时延迟数据,通过ALLOWED_LATENESS补充更新。
  • 幂等写入:数据库按(live_id, date, live_status)唯一索引,冲突时合并增量。

CREATE TABLE KafkaLogSource (
  subType STRING,
  scene STRING,
  liveId STRING,
  count BIGINT,
  event_time TIMESTAMP(3)
) WITH (
  'connector' = 'kafka',
  'topic' = 'live_count_log',
  'properties.bootstrap.servers' = '...',
  'format' = 'csv'
);

CREATE TABLE RawStatsSink (
  live_id STRING,
  date STRING,
  live_status STRING,
  pv BIGINT,
  praise BIGINT,
  ...
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://...',
  'table-name' = 'live_statistics_raw'
);

INSERT INTO RawStatsSink

SELECT
  liveId,
  DATE_FORMAT(event_time, 'yyyy-MM-dd') AS date,
  scene AS live_status,
  SUM(CASE WHEN subType = 'pv' THEN count ELSE 0 END) AS pv,
  SUM(CASE WHEN subType = 'praise' THEN count ELSE 0 END) AS praise,
  ...
FROM KafkaLogSource

GROUP BY liveId, date, scene

WINDOW TUMBLE (SIZE 1 DAY); -- 按天滚动窗口

3、数据表设计

3.1、原始数据表

live_statistics_raw表结构:


CREATE TABLE live_statistics_raw (
  live_id VARCHAR(32) NOT NULL,
  date DATE NOT NULL,
  live_status ENUM('pre', 'live', 'vod') NOT NULL,
  pv BIGINT DEFAULT 0,
  praise BIGINT DEFAULT 0,
  ...
  PRIMARY KEY (live_id, date, live_status)
);

3.2、汇总数据表

live_stat_summary表结构:


CREATE TABLE live_stat_summary (
  live_id VARCHAR(32) NOT NULL,
  live_status ENUM('pre', 'live', 'vod') NOT NULL,
  pv BIGINT DEFAULT 0,
  praise BIGINT DEFAULT 0,
  ...
  PRIMARY KEY (live_id, live_status)
);

4、分布式定时任务设计

4.1、全量统计任务

任务ID:LIVE_FULL_STAT_TASK 任务目标:归档3天前数据,缓解数据库压力。每天23点执行,归档t-3天及之前的数据到t-2这天。 防降逻辑:汇总时每个数据取MAX(当前值, 新值),确保数据单调递增。

-- 每天23点执行,归档t-3天及之前的数据到t-2天
INSERT INTO live_statistics_raw (live_id, date, live_status, pv, ...)
SELECT
  live_id,
  DATE_SUB(CURDATE(), 2) AS date, -- 归档到t-2天
  live_status,
  SUM(pv) AS pv,
  SUM(praise) AS praise,
  ...
FROM live_statistics_raw

WHERE date <= DATE_SUB(CURDATE(), 3)
GROUP BY live_id, live_status;

-- 删除已归档的t-3天及之前的旧数据
DELETE FROM live_statistics_raw

WHERE date <= DATE_SUB(CURDATE(), 3);

4.2、汇总合并统计任务

任务ID:

  • LIVE_MERGE_STAT_LIVE_TASK:直播中的直播间数据合并任务,实时性要求高,3-10秒一次
  • LIVE_MERGE_STAT_VOD_TASK:回放状态直播间数据合并任务,实时性要求相对低,30-60秒一次 任务目标:将live_statistics_raw表中近3天的原始数据合并到汇总表live_stat_summary,将每天,每个直播间的,每种状态分别合并到汇总表相应的记录中。
-- 合并直播中数据
INSERT INTO live_stat_summary (live_id, live_status, pv, ...)
SELECT
  live_id,
  'live' AS live_status,
  SUM(pv) AS pv,
  SUM(praise) AS praise,
  ...
FROM live_statistics_raw

WHERE live_status = 'live'
  AND date >= DATE_SUB(CURDATE(), 2)
GROUP BY live_id

ON DUPLICATE KEY UPDATE
  pv = GREATEST(pv, VALUES(pv)),
  praise = GREATEST(praise, VALUES(praise)),
  ...;

5、高并发降级链路

sequenceDiagram actor user as 用户 user->>直播接口: 用户访问直播间 直播接口->>本地缓存: 读取本地缓存中的实时数据 alt 未命中本地缓存 直播接口->>Redis: 读取分布式缓存中的实时数据 本地缓存->>本地缓存: 更新本地缓存并 + 1 else 命中本地缓存 本地缓存->>本地缓存: 更新本地缓存原子 + 1 end 本地缓存->>直播接口: 返回实时数据 直播接口->>user: 返回实时数据 本地缓存->>Redis: 定时同步本地缓存数据同步增量Redis Redis->>本地缓存: 定时将Redis数据同步到本地缓存

5.1、降级链路核心策略

核心目标

  • 零数据库依赖:所有读写操作仅通过缓存层完成。
  • 数据最终一致性:通过Redis暂存数据,流量恢复正常后异步回刷MySQL。
  • 热点隔离:动态分片+本地队列聚合请求,避免缓存击穿。

5.2、降级链路详细设计

5.2.1、缓存层增强

  • 本地缓存(L1)
    • 预加载策略:大促前通过历史数据预加载Top直播间信息到本地缓存,设置1秒过期。
    • 写优化:用户进入直播间时,原子更新本地计数器,避免锁竞争。
    • 防击穿:对热点直播间Key设置永不过期,并通过互斥锁重建缓存。
  • Redis分布式缓存(L2)
    • 分片扩容:按liveId哈希分片,对Top直播间动态扩容分片节点,避免单点瓶颈。
    • 批量写入:本地缓存定时通过Pipeline批量同步增量到Redis,减少网络开销。
    • 持久化保障:开启AOF持久化,防止Redis异常重启导致数据丢失。 5.2.2、降级触发与恢复
  • 触发条件
    • 监控MySQL负载(如QPS>10万或CPU>90%),自动触发降级。
    • 运维手动通过配置中心强制降级。
  • 恢复机制
    • 大促结束后,通过定时任务将Redis数据合并回刷MySQL,恢复原链路。

5.3、数据一致性保障

  • 写流程

    1. 用户请求更新本地缓存(原子操作)。
    2. 本地缓存定时批量同步到Redis,通过补偿机制处理失败。
    3. 降级期间不写MySQL,仅依赖Redis暂存数据。
  • 读流程

    1. 优先读本地缓存,未命中则查询Redis,并将数据写入本地缓存。
    2. 本地缓存定时从Redis同步统计数据,并更新本地缓存。
    3. 完全屏蔽MySQL查询,返回缓存默认值(如0)或预设文案。