RTMP推流流程详解
RTMP是Adobe公司开发的流媒体传输协议,主要用于Flash Player与服务器之间的音视频数据传输。特点基于TCP协议,保证可靠传输低延迟 (通常1-3秒)支持实时互动使用AMF (Action Message Format) 编码采用分块 (Chunk) 传输机制协议栈│ RTMP Chunk (分块传输) ││ TCP (可靠传输) ││ IP │位置RTMP_NETWORK_CHAN
本文档结合FFmpeg源代码,详细解释RTMP (Real-Time Messaging Protocol) 推流的完整流程和原理。
一、RTMP协议概述
1.1 RTMP简介
RTMP是Adobe公司开发的流媒体传输协议,主要用于Flash Player与服务器之间的音视频数据传输。
特点:
- 基于TCP协议,保证可靠传输
- 低延迟 (通常1-3秒)
- 支持实时互动
- 使用AMF (Action Message Format) 编码
- 采用分块 (Chunk) 传输机制
协议栈:
┌──────────────────────────────────┐
│ Application (FLV Data) │
├──────────────────────────────────┤
│ RTMP Message (AMF/Data) │
├──────────────────────────────────┤
│ RTMP Chunk (分块传输) │
├──────────────────────────────────┤
│ TCP (可靠传输) │
├──────────────────────────────────┤
│ IP │
└──────────────────────────────────┘
1.2 RTMP变种
位置: libavformat/rtmpproto.c:2695-2723
if (!strcmp(proto, "rtmpt") || !strcmp(proto, "rtmpts")) {
// RTMPT: RTMP over HTTP (隧道模式)
ff_url_join(buf, sizeof(buf), "ffrtmphttp", NULL, hostname, port, NULL);
} else if (!strcmp(proto, "rtmps")) {
// RTMPS: RTMP over TLS (加密)
if (port < 0)
port = RTMPS_DEFAULT_PORT; // 443
ff_url_join(buf, sizeof(buf), "tls", NULL, hostname, port, NULL);
} else if (!strcmp(proto, "rtmpe") || (!strcmp(proto, "rtmpte"))) {
// RTMPE: RTMP Encrypted (加密)
ff_url_join(buf, sizeof(buf), "ffrtmpcrypt", NULL, hostname, port, NULL);
rt->encrypted = 1;
} else {
// RTMP: 标准RTMP
if (port < 0)
port = RTMP_DEFAULT_PORT; // 1935
ff_url_join(buf, sizeof(buf), "tcp", NULL, hostname, port, NULL);
}
协议对比:
| 协议 | 端口 | 传输层 | 加密 | 用途 |
|---|---|---|---|---|
| rtmp | 1935 | TCP | 否 | 标准RTMP |
| rtmps | 443 | TLS | 是 | 加密RTMP |
| rtmpe | 1935 | TCP | 是 | Adobe加密 |
| rtmpt | 80 | HTTP | 否 | 穿透防火墙 |
| rtmpte | 80 | HTTP | 是 | 加密隧道 |
二、RTMP推流整体流程
2.1 流程图
┌─────────────────────────────────────────────────────────┐
│ RTMP推流完整流程 │
└─────────────────────────────────────────────────────────┘
1. TCP连接建立
├─→ 连接到服务器端口1935
└─→ 建立TCP连接
2. RTMP握手 (Handshake)
├─→ C0+C1: 客户端发送版本和随机数据
├─→ S0+S1+S2: 服务器响应
└─→ C2: 客户端确认
3. 连接 (Connect)
├─→ 发送connect命令
├─→ 携带app名称、flashVer等信息
└─→ 接收_result响应
4. 创建流 (CreateStream)
├─→ 发送createStream命令
└─→ 接收stream_id
5. 发布流 (Publish)
├─→ 可选: releaseStream
├─→ 可选: FCPublish
├─→ 发送publish命令
└─→ 接收onStatus(NetStream.Publish.Start)
6. 数据传输
├─→ 发送音频数据包 (RTMP_PT_AUDIO)
├─→ 发送视频数据包 (RTMP_PT_VIDEO)
├─→ 发送元数据 (RTMP_PT_NOTIFY)
└─→ 接收服务器控制消息
7. 结束流
├─→ 可选: FCUnpublish
├─→ 发送deleteStream
└─→ 关闭TCP连接
2.2 状态机
位置: libavformat/rtmpproto.c:61-71
typedef enum {
STATE_START, ///< 客户端初始状态
STATE_HANDSHAKED, ///< 完成握手
STATE_FCPUBLISH, ///< FCPublishing流 (推流)
STATE_PLAYING, ///< 开始接收数据 (拉流)
STATE_SEEKING, ///< 执行seek操作
STATE_PUBLISHING, ///< 开始发送数据 (推流)
STATE_RECEIVING, ///< 收到publish命令 (服务端)
STATE_SENDING, ///< 收到play命令 (服务端)
STATE_STOPPED, ///< 广播已停止
} ClientState;
状态转换:
START
│
▼
HANDSHAKED (握手完成)
│
├─→ FCPUBLISH (可选)
│ │
│ ▼
└─→ PUBLISHING (推流中)
│
▼
STOPPED
三、核心流程详解
3.1 TCP连接建立
位置: libavformat/rtmpproto.c:2726-2731
if ((ret = ffurl_open_whitelist(&rt->stream, buf, AVIO_FLAG_READ_WRITE,
&s->interrupt_callback, opts,
s->protocol_whitelist, s->protocol_blacklist, s)) < 0) {
av_log(s , AV_LOG_ERROR, "Cannot open connection %s\n", buf);
goto fail;
}
关键点:
- 默认端口: 1935
- 支持TCP_NODELAY选项 (禁用Nagle算法)
- 支持连接超时设置
3.2 RTMP握手 (Handshake)
RTMP握手是一个三次握手过程,用于建立连接和验证客户端。
3.2.1 握手流程
位置: libavformat/rtmpproto.c:1243-1437
客户端 服务器
│ │
│ C0 (1 byte: version=3) │
├────────────────────────────────────▶│
│ │
│ C1 (1536 bytes: timestamp+random) │
├────────────────────────────────────▶│
│ │
│ S0 (1 byte: version=3) │
│◀────────────────────────────────────┤
│ │
│ S1 (1536 bytes: timestamp+random) │
│◀────────────────────────────────────┤
│ │
│ S2 (1536 bytes: echo of C1) │
│◀────────────────────────────────────┤
│ │
│ C2 (1536 bytes: echo of S1) │
├────────────────────────────────────▶│
│ │
│ 握手完成 │
3.2.2 握手包结构
C0/S0 (1字节):
0 1 2 3 4 5 6 7
+-+-+-+-+-+-+-+-+
| Version=3 |
+-+-+-+-+-+-+-+-+
C1/S1 (1536字节):
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Time (4 bytes) |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Zero (4 bytes) |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Random bytes (1528 bytes) |
| ... |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
3.2.3 握手代码实现
static int rtmp_handshake(URLContext *s, RTMPContext *rt)
{
AVLFG rnd;
uint8_t tosend[RTMP_HANDSHAKE_PACKET_SIZE+1] = {
3, // C0: 版本号
0, 0, 0, 0, // C1: 客户端时间
RTMP_CLIENT_VER1, RTMP_CLIENT_VER2,
RTMP_CLIENT_VER3, RTMP_CLIENT_VER4,
};
uint8_t clientdata[RTMP_HANDSHAKE_PACKET_SIZE];
uint8_t serverdata[RTMP_HANDSHAKE_PACKET_SIZE+1];
int ret;
av_lfg_init(&rnd, 0xDEADC0DE);
// 1. 生成C1随机数据 (1536字节)
for (i = 9; i <= RTMP_HANDSHAKE_PACKET_SIZE; i++)
tosend[i] = av_lfg_get(&rnd) >> 24;
// 2. 计算并添加digest (用于复杂握手)
client_pos = rtmp_handshake_imprint_with_digest(tosend + 1, rt->encrypted);
// 3. 发送C0+C1
if ((ret = ffurl_write(rt->stream, tosend,
RTMP_HANDSHAKE_PACKET_SIZE + 1)) < 0) {
av_log(s, AV_LOG_ERROR, "Cannot write RTMP handshake request\n");
return ret;
}
// 4. 接收S0+S1
if ((ret = ffurl_read_complete(rt->stream, serverdata,
RTMP_HANDSHAKE_PACKET_SIZE + 1)) < 0) {
av_log(s, AV_LOG_ERROR, "Cannot read RTMP handshake response\n");
return ret;
}
// 5. 接收S2
if ((ret = ffurl_read_complete(rt->stream, clientdata,
RTMP_HANDSHAKE_PACKET_SIZE)) < 0) {
av_log(s, AV_LOG_ERROR, "Cannot read RTMP handshake response\n");
return ret;
}
av_log(s, AV_LOG_DEBUG, "Server version %d.%d.%d.%d\n",
serverdata[5], serverdata[6], serverdata[7], serverdata[8]);
// 6. 验证S1的digest
if (rt->is_input && serverdata[5] >= 3) {
server_pos = rtmp_validate_digest(serverdata + 1, 772);
if (!server_pos) {
server_pos = rtmp_validate_digest(serverdata + 1, 8);
if (!server_pos) {
av_log(s, AV_LOG_ERROR, "Server response validating failed\n");
return AVERROR(EIO);
}
}
// 7. 计算C2的签名
ret = ff_rtmp_calc_digest(tosend + 1 + client_pos, 32, 0,
rtmp_server_key, sizeof(rtmp_server_key),
digest);
ret = ff_rtmp_calc_digest(clientdata, RTMP_HANDSHAKE_PACKET_SIZE - 32,
0, digest, 32, signature);
// 8. 验证S2的签名
if (memcmp(signature, clientdata + RTMP_HANDSHAKE_PACKET_SIZE - 32, 32)) {
av_log(s, AV_LOG_ERROR, "Signature mismatch\n");
return AVERROR(EIO);
}
// 9. 生成并发送C2
for (i = 0; i < RTMP_HANDSHAKE_PACKET_SIZE; i++)
tosend[i] = av_lfg_get(&rnd) >> 24;
ret = ff_rtmp_calc_digest(serverdata + 1 + server_pos, 32, 0,
rtmp_player_key, sizeof(rtmp_player_key),
digest);
ret = ff_rtmp_calc_digest(tosend, RTMP_HANDSHAKE_PACKET_SIZE - 32, 0,
digest, 32,
tosend + RTMP_HANDSHAKE_PACKET_SIZE - 32);
}
// 10. 发送C2
if ((ret = ffurl_write(rt->stream, tosend,
RTMP_HANDSHAKE_PACKET_SIZE)) < 0)
return ret;
return 0;
}
握手类型:
- 简单握手: 不验证digest,直接echo
- 复杂握手: 使用HMAC-SHA256验证,防止中间人攻击
3.3 连接 (Connect)
建立RTMP应用层连接,协商参数。
位置: libavformat/rtmpproto.c:323-450
static int gen_connect(URLContext *s, RTMPContext *rt)
{
RTMPPacket pkt;
uint8_t *p;
int ret;
// 1. 创建RTMP包
if ((ret = ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE,
0, 4096 + APP_MAX_LENGTH)) < 0)
return ret;
p = pkt.data;
// 2. 写入AMF数据
ff_amf_write_string(&p, "connect"); // 命令名
ff_amf_write_number(&p, ++rt->nb_invokes); // 事务ID
// 3. 写入连接参数对象
ff_amf_write_object_start(&p);
ff_amf_write_field_name(&p, "app");
ff_amf_write_string2(&p, rt->app, rt->auth_params); // 应用名称
if (!rt->is_input) {
ff_amf_write_field_name(&p, "type");
ff_amf_write_string(&p, "nonprivate"); // 推流类型
}
ff_amf_write_field_name(&p, "flashVer");
ff_amf_write_string(&p, rt->flashver); // Flash版本
ff_amf_write_field_name(&p, "tcUrl");
ff_amf_write_string2(&p, rt->tcurl, rt->auth_params); // 目标URL
if (rt->is_input) {
// 拉流时的能力声明
ff_amf_write_field_name(&p, "fpad");
ff_amf_write_bool(&p, 0);
ff_amf_write_field_name(&p, "capabilities");
ff_amf_write_number(&p, 15.0);
ff_amf_write_field_name(&p, "audioCodecs");
ff_amf_write_number(&p, 4071.0); // 支持的音频编解码器
ff_amf_write_field_name(&p, "videoCodecs");
ff_amf_write_number(&p, 252.0); // 支持的视频编解码器
ff_amf_write_field_name(&p, "videoFunction");
ff_amf_write_number(&p, 1.0);
}
ff_amf_write_object_end(&p);
pkt.size = p - pkt.data;
// 4. 发送包
return rtmp_send_packet(rt, &pkt, 1);
}
Connect命令AMF格式:
┌──────────────────────────────────────┐
│ Connect命令结构 │
└──────────────────────────────────────┘
String: "connect"
Number: transaction_id (1.0)
Object: {
app: "live" // 应用名称
type: "nonprivate" // 推流类型
flashVer: "FMLE/3.0" // Flash版本
tcUrl: "rtmp://server/live" // 完整URL
// 拉流时额外参数:
fpad: false
capabilities: 15.0
audioCodecs: 4071.0
videoCodecs: 252.0
videoFunction: 1.0
}
服务器响应:
String: "_result"
Number: transaction_id (1.0)
Object: {
fmsVer: "FMS/3,0,1,123"
capabilities: 31.0
}
Object: {
level: "status"
code: "NetConnection.Connect.Success"
description: "Connection succeeded."
objectEncoding: 0.0
}
3.4 创建流 (CreateStream)
请求服务器分配一个流ID。
位置: libavformat/rtmpproto.c:715-733
static int gen_create_stream(URLContext *s, RTMPContext *rt)
{
RTMPPacket pkt;
uint8_t *p;
int ret;
av_log(s, AV_LOG_DEBUG, "Creating stream...\n");
// 1. 创建RTMP包
if ((ret = ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE,
0, 25)) < 0)
return ret;
p = pkt.data;
// 2. 写入AMF数据
ff_amf_write_string(&p, "createStream"); // 命令名
ff_amf_write_number(&p, ++rt->nb_invokes); // 事务ID
ff_amf_write_null(&p); // 命令对象 (null)
// 3. 发送包
return rtmp_send_packet(rt, &pkt, 1);
}
CreateStream命令格式:
String: "createStream"
Number: transaction_id (2.0)
Null
服务器响应:
String: "_result"
Number: transaction_id (2.0)
Null
Number: stream_id (1.0) // 分配的流ID
3.5 发布流 (Publish)
通知服务器开始推流。
3.5.1 可选步骤
releaseStream (释放流):
static int gen_release_stream(URLContext *s, RTMPContext *rt)
{
RTMPPacket pkt;
uint8_t *p;
int ret;
if ((ret = ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE,
0, 29 + strlen(rt->playpath))) < 0)
return ret;
av_log(s, AV_LOG_DEBUG, "Releasing stream...\n");
p = pkt.data;
ff_amf_write_string(&p, "releaseStream");
ff_amf_write_number(&p, ++rt->nb_invokes);
ff_amf_write_null(&p);
ff_amf_write_string(&p, rt->playpath); // 流名称
return rtmp_send_packet(rt, &pkt, 1);
}
FCPublish (Flash Communication Publish):
static int gen_fcpublish_stream(URLContext *s, RTMPContext *rt)
{
RTMPPacket pkt;
uint8_t *p;
int ret;
if ((ret = ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE,
0, 25 + strlen(rt->playpath))) < 0)
return ret;
av_log(s, AV_LOG_DEBUG, "FCPublish stream...\n");
p = pkt.data;
ff_amf_write_string(&p, "FCPublish");
ff_amf_write_number(&p, ++rt->nb_invokes);
ff_amf_write_null(&p);
ff_amf_write_string(&p, rt->playpath);
return rtmp_send_packet(rt, &pkt, 1);
}
3.5.2 Publish命令
位置: libavformat/rtmpproto.c:887-909
static int gen_publish(URLContext *s, RTMPContext *rt)
{
RTMPPacket pkt;
uint8_t *p;
int ret;
av_log(s, AV_LOG_DEBUG, "Sending publish command for '%s'\n", rt->playpath);
// 1. 创建RTMP包
if ((ret = ff_rtmp_packet_create(&pkt, RTMP_SOURCE_CHANNEL, RTMP_PT_INVOKE,
0, 30 + strlen(rt->playpath))) < 0)
return ret;
pkt.extra = rt->stream_id; // 设置流ID
p = pkt.data;
// 2. 写入AMF数据
ff_amf_write_string(&p, "publish"); // 命令名
ff_amf_write_number(&p, ++rt->nb_invokes); // 事务ID
ff_amf_write_null(&p); // 命令对象
ff_amf_write_string(&p, rt->playpath); // 流名称
ff_amf_write_string(&p, "live"); // 发布类型
// 3. 发送包
return rtmp_send_packet(rt, &pkt, 1);
}
Publish命令格式:
String: "publish"
Number: transaction_id (3.0)
Null
String: stream_name ("livestream")
String: publishing_type ("live" | "record" | "append")
发布类型:
- live: 实时流,不录制
- record: 录制流,覆盖同名文件
- append: 录制流,追加到文件
服务器响应:
String: "onStatus"
Number: 0.0
Null
Object: {
level: "status"
code: "NetStream.Publish.Start"
description: "Stream is now published."
}
3.6 数据传输
3.6.1 数据包写入流程
位置: libavformat/rtmpproto.c:3014-3149
static int rtmp_write(URLContext *s, const uint8_t *buf, int size)
{
RTMPContext *rt = s->priv_data;
int size_temp = size;
int pktsize, pkttype, copy;
uint32_t ts;
const uint8_t *buf_temp = buf;
int ret;
do {
// 1. 跳过FLV前一个包的尾部 (4字节)
if (rt->skip_bytes) {
int skip = FFMIN(rt->skip_bytes, size_temp);
buf_temp += skip;
size_temp -= skip;
rt->skip_bytes -= skip;
continue;
}
// 2. 读取FLV Tag Header (11字节)
if (rt->flv_header_bytes < RTMP_HEADER) {
copy = FFMIN(RTMP_HEADER - rt->flv_header_bytes, size_temp);
bytestream_get_buffer(&buf_temp,
rt->flv_header + rt->flv_header_bytes, copy);
rt->flv_header_bytes += copy;
size_temp -= copy;
if (rt->flv_header_bytes < RTMP_HEADER)
break;
// 3. 解析FLV Tag Header
const uint8_t *header = rt->flv_header;
int channel = RTMP_AUDIO_CHANNEL;
pkttype = bytestream_get_byte(&header); // Tag类型
pktsize = bytestream_get_be24(&header); // 数据大小
ts = bytestream_get_be24(&header); // 时间戳低24位
ts |= bytestream_get_byte(&header) << 24; // 时间戳高8位
bytestream_get_be24(&header); // StreamID (总是0)
rt->flv_size = pktsize;
// 4. 确定RTMP通道
if (pkttype == RTMP_PT_VIDEO)
channel = RTMP_VIDEO_CHANNEL;
// 5. 对于第一个音视频包或元数据,强制发送完整头
if (((pkttype == RTMP_PT_VIDEO || pkttype == RTMP_PT_AUDIO) && ts == 0) ||
pkttype == RTMP_PT_NOTIFY) {
if ((ret = ff_rtmp_check_alloc_array(&rt->prev_pkt[1],
&rt->nb_prev_pkt[1],
channel)) < 0)
return ret;
rt->prev_pkt[1][channel].channel_id = 0;
}
// 6. 创建RTMP包
if ((ret = ff_rtmp_packet_create(&rt->out_pkt, channel,
pkttype, ts, pktsize)) < 0)
return ret;
rt->out_pkt.extra = (rt->listen) ? rt->nb_streamid : rt->stream_id;
rt->flv_data = rt->out_pkt.data;
}
// 7. 复制数据到RTMP包
copy = FFMIN(rt->flv_size - rt->flv_off, size_temp);
bytestream_get_buffer(&buf_temp, rt->flv_data + rt->flv_off, copy);
rt->flv_off += copy;
size_temp -= copy;
// 8. 包完整时发送
if (rt->flv_off == rt->flv_size) {
rt->skip_bytes = 4; // 下次跳过4字节 (PreviousTagSize)
// 9. 处理元数据包 (添加@setDataFrame前缀)
if (rt->out_pkt.type == RTMP_PT_NOTIFY) {
uint8_t commandbuffer[64];
int stringlen = 0;
GetByteContext gbc;
bytestream2_init(&gbc, rt->flv_data, rt->flv_size);
if (!ff_amf_read_string(&gbc, commandbuffer, sizeof(commandbuffer),
&stringlen)) {
if (!strcmp(commandbuffer, "onMetaData") ||
!strcmp(commandbuffer, "|RtmpSampleAccess")) {
uint8_t *ptr;
if ((ret = av_reallocp(&rt->out_pkt.data,
rt->out_pkt.size + 16)) < 0) {
rt->flv_size = rt->flv_off = rt->flv_header_bytes = 0;
return ret;
}
memmove(rt->out_pkt.data + 16, rt->out_pkt.data,
rt->out_pkt.size);
rt->out_pkt.size += 16;
ptr = rt->out_pkt.data;
ff_amf_write_string(&ptr, "@setDataFrame");
}
}
}
// 10. 发送RTMP包
if ((ret = rtmp_send_packet(rt, &rt->out_pkt, 0)) < 0)
return ret;
rt->flv_size = 0;
rt->flv_off = 0;
rt->flv_header_bytes = 0;
rt->flv_nb_packets++;
}
} while (buf_temp - buf < size);
// 11. 定期检查服务器消息
if (rt->flv_nb_packets < rt->flush_interval)
return size;
rt->flv_nb_packets = 0;
// 设置非阻塞模式
rt->stream->flags |= AVIO_FLAG_NONBLOCK;
ret = ffurl_read(rt->stream, &c, 1);
rt->stream->flags &= ~AVIO_FLAG_NONBLOCK;
if (ret == 1) {
RTMPPacket rpkt = { 0 };
if ((ret = ff_rtmp_packet_read_internal(rt->stream, &rpkt,
rt->in_chunk_size,
&rt->prev_pkt[0],
&rt->nb_prev_pkt[0], c)) <= 0)
return ret;
// 处理服务器消息
if ((ret = rtmp_parse_result(s, rt, &rpkt)) < 0)
return ret;
ff_rtmp_packet_destroy(&rpkt);
}
return size;
}
3.6.2 FLV Tag格式
FLV Tag Header (11字节):
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| TagType | DataSize (24 bits) |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Timestamp (24 bits) | TimestampExt |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| StreamID (24 bits, always 0) |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Tag Data |
| ... |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
TagType:
- 8: 音频
- 9: 视频
- 18: 脚本数据 (元数据)
音频Tag Data:
0 1 2 3 4 5 6 7
+-+-+-+-+-+-+-+-+
|F| R |S| T | 音频头
+-+-+-+-+-+-+-+-+
| Audio Data | 音频数据
| ... |
+-+-+-+-+-+-+-+-+
F: SoundFormat (4 bits)
- 10: AAC
R: SoundRate (2 bits)
- 0: 5.5kHz, 1: 11kHz, 2: 22kHz, 3: 44kHz
S: SoundSize (1 bit)
- 0: 8bit, 1: 16bit
T: SoundType (1 bit)
- 0: Mono, 1: Stereo
视频Tag Data:
0 1 2 3 4 5 6 7
+-+-+-+-+-+-+-+-+
|F| C | P | 视频头
+-+-+-+-+-+-+-+-+
| Video Data | 视频数据
| ... |
+-+-+-+-+-+-+-+-+
F: FrameType (4 bits)
- 1: keyframe (I帧)
- 2: inter frame (P帧)
- 3: disposable inter frame (B帧)
C: CodecID (4 bits)
- 7: AVC (H.264)
P: AVCPacketType (8 bits, 仅AVC)
- 0: AVC sequence header (SPS/PPS)
- 1: AVC NALU
- 2: AVC end of sequence
四、RTMP Chunk协议
4.1 Chunk概念
RTMP将消息分割成更小的块 (Chunk) 进行传输,以支持多路复用。
特点:
- 默认chunk size: 128字节
- 可通过RTMP_PT_CHUNK_SIZE消息修改
- 不同通道的chunk可以交错发送
- 支持头部压缩
4.2 Chunk格式
位置: libavformat/rtmppkt.h:67-72
enum RTMPPacketSize {
RTMP_PS_TWELVEBYTES = 0, ///< 12字节头 (完整头)
RTMP_PS_EIGHTBYTES, ///< 8字节头 (无Message Stream ID)
RTMP_PS_FOURBYTES, ///< 4字节头 (仅时间戳delta)
RTMP_PS_ONEBYTE ///< 1字节头 (续传chunk)
};
Chunk Header格式:
Type 0 (12字节) - 完整头:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|fmt| cs id | timestamp (3 bytes) |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| message length (3 bytes) |message type id|
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| message stream id (4 bytes) |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| extended timestamp (4 bytes, if timestamp==0xFFFFFF)
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
Type 1 (8字节) - 无Stream ID:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|fmt| cs id | timestamp delta (3 bytes) |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| message length (3 bytes) |message type id|
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
Type 2 (4字节) - 仅时间戳delta:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|fmt| cs id | timestamp delta (3 bytes) |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
Type 3 (1字节) - 续传chunk:
0 1 2 3 4 5 6 7
+-+-+-+-+-+-+-+-+
|fmt| cs id |
+-+-+-+-+-+-+-+-+
4.3 Chunk写入实现
位置: libavformat/rtmppkt.c:310-400
int ff_rtmp_packet_write(URLContext *h, RTMPPacket *pkt,
int chunk_size, RTMPPacket **prev_pkt_ptr,
int *nb_prev_pkt)
{
uint8_t pkt_hdr[16], *p = pkt_hdr;
int mode = RTMP_PS_TWELVEBYTES;
int off = 0;
int written = 0;
int ret;
RTMPPacket *prev_pkt;
int use_delta;
uint32_t timestamp;
if ((ret = ff_rtmp_check_alloc_array(prev_pkt_ptr, nb_prev_pkt,
pkt->channel_id)) < 0)
return ret;
prev_pkt = *prev_pkt_ptr;
// 1. 判断是否可以使用delta时间戳
use_delta = prev_pkt[pkt->channel_id].channel_id &&
pkt->extra == prev_pkt[pkt->channel_id].extra &&
pkt->timestamp >= prev_pkt[pkt->channel_id].timestamp;
timestamp = pkt->timestamp;
if (use_delta) {
timestamp -= prev_pkt[pkt->channel_id].timestamp;
}
// 2. 处理扩展时间戳
if (timestamp >= 0xFFFFFF) {
pkt->ts_field = 0xFFFFFF;
} else {
pkt->ts_field = timestamp;
}
// 3. 确定chunk头类型
if (use_delta) {
if (pkt->type == prev_pkt[pkt->channel_id].type &&
pkt->size == prev_pkt[pkt->channel_id].size) {
mode = RTMP_PS_FOURBYTES; // Type 2
if (pkt->ts_field == prev_pkt[pkt->channel_id].ts_field)
mode = RTMP_PS_ONEBYTE; // Type 3
} else {
mode = RTMP_PS_EIGHTBYTES; // Type 1
}
}
// 4. 写入Basic Header
if (pkt->channel_id < 64) {
bytestream_put_byte(&p, pkt->channel_id | (mode << 6));
} else if (pkt->channel_id < 64 + 256) {
bytestream_put_byte(&p, 0 | (mode << 6));
bytestream_put_byte(&p, pkt->channel_id - 64);
} else {
bytestream_put_byte(&p, 1 | (mode << 6));
bytestream_put_le16(&p, pkt->channel_id - 64);
}
// 5. 写入Message Header
if (mode != RTMP_PS_ONEBYTE) {
bytestream_put_be24(&p, pkt->ts_field);
if (mode != RTMP_PS_FOURBYTES) {
bytestream_put_be24(&p, pkt->size);
bytestream_put_byte(&p, pkt->type);
if (mode == RTMP_PS_TWELVEBYTES) {
bytestream_put_le32(&p, pkt->extra);
}
}
}
// 6. 写入扩展时间戳
if (pkt->ts_field == 0xFFFFFF) {
bytestream_put_be32(&p, timestamp);
}
// 7. 发送chunk头
if (ffurl_write(h, pkt_hdr, p - pkt_hdr) < 0)
return AVERROR(EIO);
written = p - pkt_hdr + pkt->size;
// 8. 分chunk发送数据
while (off < pkt->size) {
int towrite = FFMIN(chunk_size, pkt->size - off);
if (ffurl_write(h, pkt->data + off, towrite) < 0)
return AVERROR(EIO);
off += towrite;
// 9. 后续chunk使用Type 3头
if (off < pkt->size) {
uint8_t marker = 0xC0 | pkt->channel_id;
if (ffurl_write(h, &marker, 1) < 0)
return AVERROR(EIO);
if (pkt->ts_field == 0xFFFFFF) {
uint8_t ts_ext[4];
bytestream_put_be32(&ts_ext, timestamp);
if (ffurl_write(h, ts_ext, 4) < 0)
return AVERROR(EIO);
}
}
}
// 10. 保存历史信息
prev_pkt[pkt->channel_id].channel_id = pkt->channel_id;
prev_pkt[pkt->channel_id].type = pkt->type;
prev_pkt[pkt->channel_id].size = pkt->size;
prev_pkt[pkt->channel_id].timestamp = pkt->timestamp;
prev_pkt[pkt->channel_id].ts_field = pkt->ts_field;
prev_pkt[pkt->channel_id].extra = pkt->extra;
return written;
}
4.4 Chunk示例
发送一个1000字节的视频包 (chunk_size=128):
Chunk 1 (Type 0, 12字节头 + 128字节数据):
[0x06] [0x00 0x00 0x64] [0x00 0x03 0xE8] [0x09] [0x00 0x00 0x00 0x01]
[128 bytes data]
Chunk 2 (Type 3, 1字节头 + 128字节数据):
[0xC6]
[128 bytes data]
Chunk 3 (Type 3, 1字节头 + 128字节数据):
[0xC6]
[128 bytes data]
...
Chunk 8 (Type 3, 1字节头 + 104字节数据):
[0xC6]
[104 bytes data]
五、RTMP通道和消息类型
5.1 通道定义
位置: libavformat/rtmppkt.h:36-42
enum RTMPChannel {
RTMP_NETWORK_CHANNEL = 2, ///< 网络控制消息 (带宽报告、ping等)
RTMP_SYSTEM_CHANNEL, ///< 服务器控制消息 (connect、createStream等)
RTMP_AUDIO_CHANNEL, ///< 音频数据
RTMP_VIDEO_CHANNEL = 6, ///< 视频数据
RTMP_SOURCE_CHANNEL = 8, ///< 音视频调用 (publish、play等)
};
5.2 消息类型
位置: libavformat/rtmppkt.h:47-62
typedef enum RTMPPacketType {
RTMP_PT_CHUNK_SIZE = 1, ///< 设置chunk大小
RTMP_PT_BYTES_READ = 3, ///< 已读字节数确认
RTMP_PT_USER_CONTROL, ///< 用户控制消息
RTMP_PT_WINDOW_ACK_SIZE, ///< 窗口确认大小
RTMP_PT_SET_PEER_BW, ///< 设置对等带宽
RTMP_PT_AUDIO = 8, ///< 音频包
RTMP_PT_VIDEO, ///< 视频包
RTMP_PT_FLEX_STREAM = 15, ///< Flex共享流
RTMP_PT_FLEX_OBJECT, ///< Flex共享对象
RTMP_PT_FLEX_MESSAGE, ///< Flex共享消息
RTMP_PT_NOTIFY, ///< 通知 (元数据)
RTMP_PT_SHARED_OBJ, ///< 共享对象
RTMP_PT_INVOKE, ///< 调用 (命令)
RTMP_PT_METADATA = 22, ///< FLV元数据
} RTMPPacketType;
5.3 控制消息
Chunk Size (Type 1):
static int handle_chunk_size(URLContext *s, RTMPPacket *pkt)
{
RTMPContext *rt = s->priv_data;
if (pkt->size < 4) {
av_log(s, AV_LOG_ERROR, "Too short chunk size change packet (%d)\n",
pkt->size);
return AVERROR_INVALIDDATA;
}
rt->in_chunk_size = AV_RB32(pkt->data);
if (rt->in_chunk_size <= 0) {
av_log(s, AV_LOG_ERROR, "Incorrect chunk size %d\n",
rt->in_chunk_size);
return AVERROR_INVALIDDATA;
}
av_log(s, AV_LOG_DEBUG, "New incoming chunk size = %d\n",
rt->in_chunk_size);
return 0;
}
Window Acknowledgement Size (Type 5):
static int handle_window_ack_size(URLContext *s, RTMPPacket *pkt)
{
RTMPContext *rt = s->priv_data;
if (pkt->size < 4) {
av_log(s, AV_LOG_ERROR,
"Too short window acknowledgement size packet (%d)\n",
pkt->size);
return AVERROR_INVALIDDATA;
}
rt->receive_report_size = AV_RB32(pkt->data);
if (rt->receive_report_size <= 0) {
av_log(s, AV_LOG_ERROR, "Incorrect window acknowledgement size %d\n",
rt->receive_report_size);
return AVERROR_INVALIDDATA;
}
av_log(s, AV_LOG_DEBUG, "Window acknowledgement size = %d\n",
rt->receive_report_size);
return 0;
}
Set Peer Bandwidth (Type 6):
static int handle_set_peer_bw(URLContext *s, RTMPPacket *pkt)
{
RTMPContext *rt = s->priv_data;
if (pkt->size < 4) {
av_log(s, AV_LOG_ERROR,
"Too short set peer bandwidth packet (%d)\n",
pkt->size);
return AVERROR_INVALIDDATA;
}
rt->max_sent_unacked = AV_RB32(pkt->data);
av_log(s, AV_LOG_DEBUG, "Max sent, unacked = %d\n",
rt->max_sent_unacked);
return 0;
}
六、AMF编码
6.1 AMF简介
AMF (Action Message Format) 是Adobe的二进制数据格式,用于序列化对象。
AMF0数据类型:
#define AMF_DATA_TYPE_NUMBER 0x00
#define AMF_DATA_TYPE_BOOL 0x01
#define AMF_DATA_TYPE_STRING 0x02
#define AMF_DATA_TYPE_OBJECT 0x03
#define AMF_DATA_TYPE_NULL 0x05
#define AMF_DATA_TYPE_UNDEFINED 0x06
#define AMF_DATA_TYPE_ARRAY 0x08
#define AMF_DATA_TYPE_OBJECT_END 0x09
#define AMF_DATA_TYPE_MIXEDARRAY 0x0A
6.2 AMF编码示例
Number (8字节):
[0x00] [IEEE 754 double, 8 bytes]
String:
[0x02] [length, 2 bytes] [string data]
Object:
[0x03]
[property name length, 2 bytes] [property name]
[property value type] [property value]
...
[0x00 0x00 0x09] // Object End
Null:
[0x05]
6.3 Connect命令AMF示例
02 00 07 "connect" // String: "connect"
00 3F F0 00 00 00 00 00 00 // Number: 1.0
03 // Object start
00 03 "app" // Property: "app"
02 00 04 "live" // String: "live"
00 04 "type" // Property: "type"
02 00 0A "nonprivate" // String: "nonprivate"
00 08 "flashVer" // Property: "flashVer"
02 00 0E "FMLE/3.0 ..." // String: "FMLE/3.0..."
00 05 "tcUrl" // Property: "tcUrl"
02 00 1F "rtmp://..." // String: "rtmp://..."
00 00 09 // Object end
七、完整推流示例
7.1 推流命令
ffmpeg -re -i input.mp4 -c copy -f flv rtmp://server/live/stream
7.2 网络交互时序
客户端 服务器
│ │
│ 1. TCP SYN │
├────────────────────────────────────────▶│
│ 2. TCP SYN-ACK │
│◀────────────────────────────────────────┤
│ 3. TCP ACK │
├────────────────────────────────────────▶│
│ │
│ 4. C0+C1 (Handshake) │
├────────────────────────────────────────▶│
│ 5. S0+S1+S2 │
│◀────────────────────────────────────────┤
│ 6. C2 │
├────────────────────────────────────────▶│
│ │
│ 7. Set Chunk Size (4096) │
├────────────────────────────────────────▶│
│ 8. Connect("live") │
├────────────────────────────────────────▶│
│ 9. Window Ack Size │
│◀────────────────────────────────────────┤
│ 10. Set Peer Bandwidth │
│◀────────────────────────────────────────┤
│ 11. User Control (Stream Begin) │
│◀────────────────────────────────────────┤
│ 12. Set Chunk Size │
│◀────────────────────────────────────────┤
│ 13. _result(NetConnection.Connect.Success)│
│◀────────────────────────────────────────┤
│ 14. onBWDone │
│◀────────────────────────────────────────┤
│ │
│ 15. releaseStream("stream") │
├────────────────────────────────────────▶│
│ 16. FCPublish("stream") │
├────────────────────────────────────────▶│
│ 17. createStream() │
├────────────────────────────────────────▶│
│ 18. _result(stream_id=1) │
│◀────────────────────────────────────────┤
│ │
│ 19. publish("stream", "live") │
├────────────────────────────────────────▶│
│ 20. onStatus(NetStream.Publish.Start) │
│◀────────────────────────────────────────┤
│ │
│ 21. @setDataFrame("onMetaData", {...}) │
├────────────────────────────────────────▶│
│ 22. Video (AVC Sequence Header) │
├────────────────────────────────────────▶│
│ 23. Audio (AAC Sequence Header) │
├────────────────────────────────────────▶│
│ 24. Video (Keyframe) │
├────────────────────────────────────────▶│
│ 25. Audio Data │
├────────────────────────────────────────▶│
│ 26. Video (P-frame) │
├────────────────────────────────────────▶│
│ ... │
│ (持续发送音视频数据) │
│ ... │
│ │
│ N. deleteStream(stream_id) │
├────────────────────────────────────────▶│
│ N+1. TCP FIN │
├────────────────────────────────────────▶│
7.3 元数据示例
{
"duration": 0,
"width": 1920,
"height": 1080,
"videodatarate": 2500,
"framerate": 30,
"videocodecid": 7, // AVC
"audiodatarate": 128,
"audiosamplerate": 48000,
"audiosamplesize": 16,
"stereo": true,
"audiocodecid": 10, // AAC
"encoder": "Lavf58.76.100"
}
八、关键数据结构
8.1 RTMPContext
位置: libavformat/rtmpproto.c:79-137
typedef struct RTMPContext {
const AVClass *class;
URLContext* stream; ///< TCP流
RTMPPacket *prev_pkt[2]; ///< 包历史 ([0]读, [1]写)
int nb_prev_pkt[2]; ///< prev_pkt元素数
int in_chunk_size; ///< 接收chunk大小
int out_chunk_size; ///< 发送chunk大小
int is_input; ///< 输入/输出标志
char *playpath; ///< 流标识符
int live; ///< 0:录制, -1:直播, -2:两者
char *app; ///< 应用名称
char *conn; ///< 附加AMF数据
ClientState state; ///< 当前状态
int stream_id; ///< 服务器分配的流ID
uint8_t* flv_data; ///< demuxer缓冲区
int flv_size; ///< 当前缓冲区大小
int flv_off; ///< 已读字节数
int flv_nb_packets; ///< 已发布的flv包数
RTMPPacket out_pkt; ///< 输出rtmp包
uint32_t receive_report_size; ///< 报告接收字节数的阈值
uint64_t bytes_read; ///< 从服务器读取的字节数
uint64_t last_bytes_read; ///< 上次报告的字节数
uint32_t last_timestamp; ///< 最后接收包的时间戳
int skip_bytes; ///< 下次写入时跳过的字节数
int has_audio; ///< 是否有音频
int has_video; ///< 是否有视频
int received_metadata; ///< 是否接收到元数据
uint8_t flv_header[RTMP_HEADER]; ///< 部分flv包头
int flv_header_bytes; ///< flv_header中已初始化字节数
int nb_invokes; ///< invoke消息计数
char* tcurl; ///< 目标流URL
char* flashver; ///< flash插件版本
char* swfurl; ///< swf播放器URL
char* pageurl; ///< 网页URL
int max_sent_unacked; ///< 最大未确认发送字节数
int client_buffer_time; ///< 客户端缓冲时间(ms)
int flush_interval; ///< 刷新间隔(包数)
int encrypted; ///< 使用加密连接(RTMPE)
TrackedMethod*tracked_methods; ///< 跟踪的方法缓冲区
int nb_tracked_methods; ///< 跟踪的方法数
int listen; ///< 监听模式标志
int tcp_nodelay; ///< 使用TCP_NODELAY
} RTMPContext;
8.2 RTMPPacket
位置: libavformat/rtmppkt.h:77-87
typedef struct RTMPPacket {
int channel_id; ///< 通道ID
RTMPPacketType type; ///< 包类型
uint32_t timestamp; ///< 完整时间戳
uint32_t ts_field; ///< 24位时间戳或增量(ms)
uint32_t extra; ///< 额外的通道ID
uint8_t *data; ///< 包payload
int size; ///< payload大小
int offset; ///< 已读数据量
int read; ///< 已读总量(包括头)
} RTMPPacket;
九、性能优化和最佳实践
9.1 Chunk Size优化
// 默认chunk size较小 (128字节),增大可以减少头部开销
rt->out_chunk_size = 4096; // 推荐值
// 发送Set Chunk Size消息
RTMPPacket pkt;
ff_rtmp_packet_create(&pkt, RTMP_NETWORK_CHANNEL,
RTMP_PT_CHUNK_SIZE, 0, 4);
uint8_t *p = pkt.data;
bytestream_put_be32(&p, rt->out_chunk_size);
rtmp_send_packet(rt, &pkt, 0);
9.2 TCP_NODELAY
// 禁用Nagle算法,减少延迟
rt->tcp_nodelay = 1;
9.3 缓冲区管理
// 客户端缓冲时间设置
rt->client_buffer_time = 3000; // 3秒
// 发送Buffer Time消息
gen_buffer_time(s, rt);
9.4 错误处理
// 定期检查服务器消息
if (rt->flv_nb_packets >= rt->flush_interval) {
rt->flv_nb_packets = 0;
// 非阻塞读取
rt->stream->flags |= AVIO_FLAG_NONBLOCK;
ret = ffurl_read(rt->stream, &c, 1);
rt->stream->flags &= ~AVIO_FLAG_NONBLOCK;
if (ret == 1) {
// 处理服务器消息
rtmp_parse_result(s, rt, &rpkt);
}
}
十、常见问题和调试
10.1 连接失败
问题: 无法连接到RTMP服务器
排查:
- 检查网络连通性
- 验证URL格式:
rtmp://server[:port]/app/stream - 检查防火墙设置
- 查看服务器日志
10.2 握手失败
问题: Handshake验证失败
原因:
- 服务器要求复杂握手,但客户端不支持
- 加密握手配置错误
解决:
# 使用简单握手
ffmpeg -rtmp_enhanced_codecs "" -i input.mp4 -f flv rtmp://server/live/stream
10.3 推流卡顿
问题: 推流过程中出现卡顿
原因:
- 网络带宽不足
- Chunk size太小
- 服务器处理能力不足
解决:
# 降低码率
ffmpeg -i input.mp4 -b:v 1000k -b:a 128k -f flv rtmp://server/live/stream
# 增大chunk size (在代码中设置)
rt->out_chunk_size = 8192;
10.4 调试技巧
启用详细日志:
ffmpeg -loglevel debug -i input.mp4 -f flv rtmp://server/live/stream
抓包分析:
tcpdump -i any -w rtmp.pcap port 1935
wireshark rtmp.pcap
代码调试:
// 在关键位置添加日志
av_log(s, AV_LOG_DEBUG, "Sending packet: type=%d, size=%d, ts=%u\n",
pkt.type, pkt.size, pkt.timestamp);
// 启用包转储
#define DEBUG
ff_rtmp_packet_dump(s, &pkt);
十一、总结
RTMP推流的完整流程:
- TCP连接: 建立到服务器的TCP连接
- 握手: 三次握手建立RTMP连接
- 连接: 发送connect命令,协商参数
- 创建流: 发送createStream,获取stream_id
- 发布: 发送publish命令,开始推流
- 数据传输:
- 发送元数据 (@setDataFrame)
- 发送AVC/AAC Sequence Header
- 持续发送音视频数据
- 结束: 发送deleteStream,关闭连接
关键特性:
- 分块传输: Chunk机制支持多路复用
- 头部压缩: 4种chunk头类型减少开销
- AMF编码: 高效的二进制序列化
- 低延迟: 1-3秒延迟,适合实时互动
- 可靠传输: 基于TCP,保证数据完整性
适用场景:
- 直播推流
- 视频会议
- 在线教育
- 游戏直播
更多推荐


所有评论(0)