1、日志生成与采集
日志格式:
//日志格式 CountingLog|livecount|subType|status|uid|liveId|count //示例 CountingLog|livecount|pv|preLive|201003011099|202301022091110099|1
字段说明:
- subType:pv / 点赞 / 关注 / 评论 / 订阅 / 取消订阅
- status:preLive(预告) / live(直播) / vod(回放)
- count:增量值,值为1
日志采集:
- 通过Kafka 收集日志,确保高吞吐与可靠性。
- 用户调用进房接口时直接写入Kafka,避免同步阻塞。
2、Flink实时统计链路
目标:消费日志并实时聚合到MySQL的live_statistics_raw表。
2.1、数据流处理
- 状态管理:使用Flink RocksDB状态后端,支持海量Key状态存储。
- 延迟处理:允许1小时延迟数据,通过ALLOWED_LATENESS补充更新。
- 幂等写入:数据库按(live_id, date, live_status)唯一索引,冲突时合并增量。
2.2、Flink SQL实现
CREATE TABLE KafkaLogSource (
subType STRING,
scene STRING,
liveId STRING,
count BIGINT,
event_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'live_count_log',
'properties.bootstrap.servers' = '...',
'format' = 'csv'
);
CREATE TABLE RawStatsSink (
live_id STRING,
date STRING,
live_status STRING,
pv BIGINT,
praise BIGINT,
...
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://...',
'table-name' = 'live_statistics_raw'
);
INSERT INTO RawStatsSink
SELECT
liveId,
DATE_FORMAT(event_time, 'yyyy-MM-dd') AS date,
scene AS live_status,
SUM(CASE WHEN subType = 'pv' THEN count ELSE 0 END) AS pv,
SUM(CASE WHEN subType = 'praise' THEN count ELSE 0 END) AS praise,
...
FROM KafkaLogSource
GROUP BY liveId, date, scene
WINDOW TUMBLE (SIZE 1 DAY); -- 按天滚动窗口
3、数据表设计
3.1、原始数据表
live_statistics_raw表结构:
CREATE TABLE live_statistics_raw (
live_id VARCHAR(32) NOT NULL,
date DATE NOT NULL,
live_status ENUM('pre', 'live', 'vod') NOT NULL,
pv BIGINT DEFAULT 0,
praise BIGINT DEFAULT 0,
...
PRIMARY KEY (live_id, date, live_status)
);
3.2、汇总数据表
live_stat_summary表结构:
CREATE TABLE live_stat_summary (
live_id VARCHAR(32) NOT NULL,
live_status ENUM('pre', 'live', 'vod') NOT NULL,
pv BIGINT DEFAULT 0,
praise BIGINT DEFAULT 0,
...
PRIMARY KEY (live_id, live_status)
);
4、分布式定时任务设计
4.1、全量统计任务
任务ID:LIVE_FULL_STAT_TASK 任务目标:归档3天前数据,缓解数据库压力。每天23点执行,归档t-3天及之前的数据到t-2这天。 防降逻辑:汇总时每个数据取MAX(当前值, 新值),确保数据单调递增。
-- 每天23点执行,归档t-3天及之前的数据到t-2天
INSERT INTO live_statistics_raw (live_id, date, live_status, pv, ...)
SELECT
live_id,
DATE_SUB(CURDATE(), 2) AS date, -- 归档到t-2天
live_status,
SUM(pv) AS pv,
SUM(praise) AS praise,
...
FROM live_statistics_raw
WHERE date <= DATE_SUB(CURDATE(), 3)
GROUP BY live_id, live_status;
-- 删除已归档的t-3天及之前的旧数据
DELETE FROM live_statistics_raw
WHERE date <= DATE_SUB(CURDATE(), 3);
4.2、汇总合并统计任务
任务ID:
- LIVE_MERGE_STAT_LIVE_TASK:直播中的直播间数据合并任务,实时性要求高,3-10秒一次
- LIVE_MERGE_STAT_VOD_TASK:回放状态直播间数据合并任务,实时性要求相对低,30-60秒一次 任务目标:将live_statistics_raw表中近3天的原始数据合并到汇总表live_stat_summary,将每天,每个直播间的,每种状态分别合并到汇总表相应的记录中。
-- 合并直播中数据
INSERT INTO live_stat_summary (live_id, live_status, pv, ...)
SELECT
live_id,
'live' AS live_status,
SUM(pv) AS pv,
SUM(praise) AS praise,
...
FROM live_statistics_raw
WHERE live_status = 'live'
AND date >= DATE_SUB(CURDATE(), 2)
GROUP BY live_id
ON DUPLICATE KEY UPDATE
pv = GREATEST(pv, VALUES(pv)),
praise = GREATEST(praise, VALUES(praise)),
...;
5、高并发降级链路
5.1、降级链路核心策略
核心目标:
- 零数据库依赖:所有读写操作仅通过缓存层完成。
- 数据最终一致性:通过Redis暂存数据,流量恢复正常后异步回刷MySQL。
- 热点隔离:动态分片+本地队列聚合请求,避免缓存击穿。
5.2、降级链路详细设计
5.2.1、缓存层增强
- 本地缓存(L1):
- 预加载策略:大促前通过历史数据预加载Top直播间信息到本地缓存,设置1秒过期。
- 写优化:用户进入直播间时,原子更新本地计数器,避免锁竞争。
- 防击穿:对热点直播间Key设置永不过期,并通过互斥锁重建缓存。
- Redis分布式缓存(L2):
- 分片扩容:按liveId哈希分片,对Top直播间动态扩容分片节点,避免单点瓶颈。
- 批量写入:本地缓存定时通过Pipeline批量同步增量到Redis,减少网络开销。
- 持久化保障:开启AOF持久化,防止Redis异常重启导致数据丢失。 5.2.2、降级触发与恢复
- 触发条件:
- 监控MySQL负载(如QPS>10万或CPU>90%),自动触发降级。
- 运维手动通过配置中心强制降级。
- 恢复机制:
- 大促结束后,通过定时任务将Redis数据合并回刷MySQL,恢复原链路。
5.3、数据一致性保障
写流程:
- 用户请求更新本地缓存(原子操作)。
- 本地缓存定时批量同步到Redis,通过补偿机制处理失败。
- 降级期间不写MySQL,仅依赖Redis暂存数据。
读流程:
- 优先读本地缓存,未命中则查询Redis,并将数据写入本地缓存。
- 本地缓存定时从Redis同步统计数据,并更新本地缓存。
- 完全屏蔽MySQL查询,返回缓存默认值(如0)或预设文案。