1 Star 0 Fork 13

MeenJ. / Node-Media-Server

forked from nygula / Node-Media-Server 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
node_rtmp_client.js 23.35 KB
一键复制 编辑 原始数据 按行查看 历史
Chen Mingliang 提交于 2020-07-16 19:50 . Add eslint support
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791
//
// Created by Mingliang Chen on 18/6/21.
// illuspas[a]gmail.com
// Copyright (c) 2018 Nodemedia. All rights reserved.
//
const EventEmitter = require('events');
const Logger = require('./node_core_logger');
const Crypto = require('crypto');
const Url = require('url');
const Net = require('net');
const AMF = require('./node_core_amf');
const FLASHVER = "LNX 9,0,124,2";
const RTMP_OUT_CHUNK_SIZE = 60000;
const RTMP_PORT = 1935;
const RTMP_HANDSHAKE_SIZE = 1536;
const RTMP_HANDSHAKE_UNINIT = 0;
const RTMP_HANDSHAKE_0 = 1;
const RTMP_HANDSHAKE_1 = 2;
const RTMP_HANDSHAKE_2 = 3;
const RTMP_PARSE_INIT = 0;
const RTMP_PARSE_BASIC_HEADER = 1;
const RTMP_PARSE_MESSAGE_HEADER = 2;
const RTMP_PARSE_EXTENDED_TIMESTAMP = 3;
const RTMP_PARSE_PAYLOAD = 4;
const RTMP_CHUNK_HEADER_MAX = 18;
const RTMP_CHUNK_TYPE_0 = 0; // 11-bytes: timestamp(3) + length(3) + stream type(1) + stream id(4)
const RTMP_CHUNK_TYPE_1 = 1; // 7-bytes: delta(3) + length(3) + stream type(1)
const RTMP_CHUNK_TYPE_2 = 2; // 3-bytes: delta(3)
const RTMP_CHUNK_TYPE_3 = 3; // 0-byte
const RTMP_CHANNEL_PROTOCOL = 2;
const RTMP_CHANNEL_INVOKE = 3;
const RTMP_CHANNEL_AUDIO = 4;
const RTMP_CHANNEL_VIDEO = 5;
const RTMP_CHANNEL_DATA = 6;
const rtmpHeaderSize = [11, 7, 3, 0];
/* Protocol Control Messages */
const RTMP_TYPE_SET_CHUNK_SIZE = 1;
const RTMP_TYPE_ABORT = 2;
const RTMP_TYPE_ACKNOWLEDGEMENT = 3; // bytes read report
const RTMP_TYPE_WINDOW_ACKNOWLEDGEMENT_SIZE = 5; // server bandwidth
const RTMP_TYPE_SET_PEER_BANDWIDTH = 6; // client bandwidth
/* User Control Messages Event (4) */
const RTMP_TYPE_EVENT = 4;
const RTMP_TYPE_AUDIO = 8;
const RTMP_TYPE_VIDEO = 9;
/* Data Message */
const RTMP_TYPE_FLEX_STREAM = 15; // AMF3
const RTMP_TYPE_DATA = 18; // AMF0
/* Shared Object Message */
const RTMP_TYPE_FLEX_OBJECT = 16; // AMF3
const RTMP_TYPE_SHARED_OBJECT = 19; // AMF0
/* Command Message */
const RTMP_TYPE_FLEX_MESSAGE = 17; // AMF3
const RTMP_TYPE_INVOKE = 20; // AMF0
/* Aggregate Message */
const RTMP_TYPE_METADATA = 22;
const RTMP_CHUNK_SIZE = 128;
const RTMP_PING_TIME = 60000;
const RTMP_PING_TIMEOUT = 30000;
const STREAM_BEGIN = 0x00;
const STREAM_EOF = 0x01;
const STREAM_DRY = 0x02;
const STREAM_EMPTY = 0x1f;
const STREAM_READY = 0x20;
const RTMP_TRANSACTION_CONNECT = 1;
const RTMP_TRANSACTION_CREATE_STREAM = 2;
const RTMP_TRANSACTION_GET_STREAM_LENGTH = 3;
const RtmpPacket = {
create: (fmt = 0, cid = 0) => {
return {
header: {
fmt: fmt,
cid: cid,
timestamp: 0,
length: 0,
type: 0,
stream_id: 0
},
clock: 0,
delta: 0,
payload: null,
capacity: 0,
bytes: 0
};
}
};
class NodeRtmpClient {
constructor(rtmpUrl) {
this.url = rtmpUrl;
this.info = this.rtmpUrlParser(rtmpUrl);
this.isPublish = false;
this.launcher = new EventEmitter();
this.handshakePayload = Buffer.alloc(RTMP_HANDSHAKE_SIZE);
this.handshakeState = RTMP_HANDSHAKE_UNINIT;
this.handshakeBytes = 0;
this.parserBuffer = Buffer.alloc(RTMP_CHUNK_HEADER_MAX);
this.parserState = RTMP_PARSE_INIT;
this.parserBytes = 0;
this.parserBasicBytes = 0;
this.parserPacket = null;
this.inPackets = new Map();
this.inChunkSize = RTMP_CHUNK_SIZE;
this.outChunkSize = RTMP_CHUNK_SIZE;
this.streamId = 0;
this.isSocketOpen = false;
}
onSocketData(data) {
let bytes = data.length;
let p = 0;
let n = 0;
while (bytes > 0) {
switch (this.handshakeState) {
case RTMP_HANDSHAKE_UNINIT:
// read s0
// Logger.debug('[rtmp client] read s0');
this.handshakeState = RTMP_HANDSHAKE_0;
this.handshakeBytes = 0;
bytes -= 1;
p += 1;
break;
case RTMP_HANDSHAKE_0:
// read s1
n = RTMP_HANDSHAKE_SIZE - this.handshakeBytes;
n = n <= bytes ? n : bytes;
data.copy(this.handshakePayload, this.handshakeBytes, p, p + n);
this.handshakeBytes += n;
bytes -= n;
p += n;
if (this.handshakeBytes === RTMP_HANDSHAKE_SIZE) {
// Logger.debug('[rtmp client] read s1');
this.handshakeState = RTMP_HANDSHAKE_1;
this.handshakeBytes = 0;
this.socket.write(this.handshakePayload);// write c2;
// Logger.debug('[rtmp client] write c2');
}
break;
case RTMP_HANDSHAKE_1:
//read s2
n = RTMP_HANDSHAKE_SIZE - this.handshakeBytes;
n = n <= bytes ? n : bytes;
data.copy(this.handshakePayload, this.handshakeBytes, p, n);
this.handshakeBytes += n;
bytes -= n;
p += n;
if (this.handshakeBytes === RTMP_HANDSHAKE_SIZE) {
// Logger.debug('[rtmp client] read s2');
this.handshakeState = RTMP_HANDSHAKE_2;
this.handshakeBytes = 0;
this.handshakePayload = null;
this.rtmpSendConnect();
}
break;
case RTMP_HANDSHAKE_2:
return this.rtmpChunkRead(data, p, bytes);
}
}
}
onSocketError(e) {
Logger.error('rtmp_client', "onSocketError", e);
this.isSocketOpen = false;
this.stop();
}
onSocketClose() {
// Logger.debug('rtmp_client', "onSocketClose");
this.isSocketOpen = false;
this.stop();
}
onSocketTimeout() {
// Logger.debug('rtmp_client', "onSocketTimeout");
this.isSocketOpen = false;
this.stop();
}
on(event, callback) {
this.launcher.on(event, callback);
}
startPull() {
this._start();
}
startPush() {
this.isPublish = true;
this._start();
}
_start() {
this.socket = Net.createConnection(this.info.port, this.info.hostname, () => {
//rtmp handshark c0c1
let c0c1 = Crypto.randomBytes(1537);
c0c1.writeUInt8(3);
c0c1.writeUInt32BE(Date.now() / 1000, 1);
c0c1.writeUInt32BE(0, 5);
this.socket.write(c0c1);
// Logger.debug('[rtmp client] write c0c1');
});
this.socket.on('data', this.onSocketData.bind(this));
this.socket.on('error', this.onSocketError.bind(this));
this.socket.on('close', this.onSocketClose.bind(this));
this.socket.on('timeout', this.onSocketTimeout.bind(this));
this.socket.setTimeout(60000);
}
stop() {
if (this.streamId > 0) {
if (!this.socket.destroyed) {
if (this.isPublish) {
this.rtmpSendFCUnpublish();
}
this.rtmpSendDeleteStream();
this.socket.destroy();
}
this.streamId = 0;
this.launcher.emit('close');
}
}
pushAudio(audioData, timestamp) {
if (this.streamId == 0) return;
let packet = RtmpPacket.create();
packet.header.fmt = RTMP_CHUNK_TYPE_0;
packet.header.cid = RTMP_CHANNEL_AUDIO;
packet.header.type = RTMP_TYPE_AUDIO;
packet.payload = audioData;
packet.header.length = packet.payload.length;
packet.header.timestamp = timestamp;
let rtmpChunks = this.rtmpChunksCreate(packet);
this.socket.write(rtmpChunks);
}
pushVideo(videoData, timestamp) {
if (this.streamId == 0) return;
let packet = RtmpPacket.create();
packet.header.fmt = RTMP_CHUNK_TYPE_0;
packet.header.cid = RTMP_CHANNEL_VIDEO;
packet.header.type = RTMP_TYPE_VIDEO;
packet.payload = videoData;
packet.header.length = packet.payload.length;
packet.header.timestamp = timestamp;
let rtmpChunks = this.rtmpChunksCreate(packet);
this.socket.write(rtmpChunks);
}
pushScript(scriptData, timestamp) {
if (this.streamId == 0) return;
let packet = RtmpPacket.create();
packet.header.fmt = RTMP_CHUNK_TYPE_0;
packet.header.cid = RTMP_CHANNEL_DATA;
packet.header.type = RTMP_TYPE_DATA;
packet.payload = scriptData;
packet.header.length = packet.payload.length;
packet.header.timestamp = timestamp;
let rtmpChunks = this.rtmpChunksCreate(packet);
this.socket.write(rtmpChunks);
}
rtmpUrlParser(url) {
let urlInfo = Url.parse(url, true);
urlInfo.app = urlInfo.path.split('/')[1];
urlInfo.port = !!urlInfo.port ? urlInfo.port : RTMP_PORT;
urlInfo.tcurl = urlInfo.href.match(/rtmp:\/\/([^\/]+)\/([^\/]+)/)[0];
urlInfo.stream = urlInfo.path.slice(urlInfo.app.length + 2);
return urlInfo;
}
rtmpChunkBasicHeaderCreate(fmt, cid) {
let out;
if (cid >= 64 + 255) {
out = Buffer.alloc(3);
out[0] = (fmt << 6) | 1;
out[1] = (cid - 64) & 0xFF;
out[2] = ((cid - 64) >> 8) & 0xFF;
} else if (cid >= 64) {
out = Buffer.alloc(2);
out[0] = (fmt << 6) | 0;
out[1] = (cid - 64) & 0xFF;
} else {
out = Buffer.alloc(1);
out[0] = (fmt << 6) | cid;
}
return out;
}
rtmpChunkMessageHeaderCreate(header) {
let out = Buffer.alloc(rtmpHeaderSize[header.fmt % 4]);
if (header.fmt <= RTMP_CHUNK_TYPE_2) {
out.writeUIntBE(header.timestamp >= 0xffffff ? 0xffffff : header.timestamp, 0, 3);
}
if (header.fmt <= RTMP_CHUNK_TYPE_1) {
out.writeUIntBE(header.length, 3, 3);
out.writeUInt8(header.type, 6);
}
if (header.fmt === RTMP_CHUNK_TYPE_0) {
out.writeUInt32LE(header.stream_id, 7);
}
return out;
}
rtmpChunksCreate(packet) {
let header = packet.header;
let payload = packet.payload;
let payloadSize = header.length;
let chunkSize = this.outChunkSize;
let chunksOffset = 0;
let payloadOffset = 0;
let chunkBasicHeader = this.rtmpChunkBasicHeaderCreate(header.fmt, header.cid);
let chunkBasicHeader3 = this.rtmpChunkBasicHeaderCreate(RTMP_CHUNK_TYPE_3, header.cid);
let chunkMessageHeader = this.rtmpChunkMessageHeaderCreate(header);
let useExtendedTimestamp = header.timestamp >= 0xffffff;
let headerSize = chunkBasicHeader.length + chunkMessageHeader.length + (useExtendedTimestamp ? 4 : 0);
let n = headerSize + payloadSize + Math.floor(payloadSize / chunkSize);
if (useExtendedTimestamp) {
n += Math.floor(payloadSize / chunkSize) * 4;
}
if (!(payloadSize % chunkSize)) {
n -= 1;
if (useExtendedTimestamp) { //TODO CHECK
n -= 4;
}
}
let chunks = Buffer.alloc(n);
chunkBasicHeader.copy(chunks, chunksOffset);
chunksOffset += chunkBasicHeader.length;
chunkMessageHeader.copy(chunks, chunksOffset);
chunksOffset += chunkMessageHeader.length;
if (useExtendedTimestamp) {
chunks.writeUInt32BE(header.timestamp, chunksOffset);
chunksOffset += 4;
}
while (payloadSize > 0) {
if (payloadSize > chunkSize) {
payload.copy(chunks, chunksOffset, payloadOffset, payloadOffset + chunkSize);
payloadSize -= chunkSize;
chunksOffset += chunkSize;
payloadOffset += chunkSize;
chunkBasicHeader3.copy(chunks, chunksOffset);
chunksOffset += chunkBasicHeader3.length;
if (useExtendedTimestamp) {
chunks.writeUInt32BE(header.timestamp, chunksOffset);
chunksOffset += 4;
}
} else {
payload.copy(chunks, chunksOffset, payloadOffset, payloadOffset + payloadSize);
payloadSize -= payloadSize;
chunksOffset += payloadSize;
payloadOffset += payloadSize;
}
}
return chunks;
}
rtmpChunkRead(data, p, bytes) {
let size = 0;
let offset = 0;
let extended_timestamp = 0;
while (offset < bytes) {
switch (this.parserState) {
case RTMP_PARSE_INIT:
this.parserBytes = 1;
this.parserBuffer[0] = data[p + offset++];
if (0 === (this.parserBuffer[0] & 0x3F)) {
this.parserBasicBytes = 2;
} else if (1 === (this.parserBuffer[0] & 0x3F)) {
this.parserBasicBytes = 3;
} else {
this.parserBasicBytes = 1;
}
this.parserState = RTMP_PARSE_BASIC_HEADER;
break;
case RTMP_PARSE_BASIC_HEADER:
while (this.parserBytes < this.parserBasicBytes && offset < bytes) {
this.parserBuffer[this.parserBytes++] = data[p + offset++];
}
if (this.parserBytes >= this.parserBasicBytes) {
this.parserState = RTMP_PARSE_MESSAGE_HEADER;
}
break;
case RTMP_PARSE_MESSAGE_HEADER:
size = rtmpHeaderSize[this.parserBuffer[0] >> 6] + this.parserBasicBytes;
while (this.parserBytes < size && offset < bytes) {
this.parserBuffer[this.parserBytes++] = data[p + offset++];
}
if (this.parserBytes >= size) {
this.rtmpPacketParse();
this.parserState = RTMP_PARSE_EXTENDED_TIMESTAMP;
}
break;
case RTMP_PARSE_EXTENDED_TIMESTAMP:
size = rtmpHeaderSize[this.parserPacket.header.fmt] + this.parserBasicBytes;
if (this.parserPacket.header.timestamp === 0xFFFFFF) size += 4;
while (this.parserBytes < size && offset < bytes) {
this.parserBuffer[this.parserBytes++] = data[p + offset++];
}
if (this.parserBytes >= size) {
if (this.parserPacket.header.timestamp === 0xFFFFFF) {
extended_timestamp = this.parserBuffer.readUInt32BE(rtmpHeaderSize[this.parserPacket.header.fmt] + this.parserBasicBytes);
}
if (0 === this.parserPacket.bytes) {
if (RTMP_CHUNK_TYPE_0 === this.parserPacket.header.fmt) {
this.parserPacket.clock = 0xFFFFFF === this.parserPacket.header.timestamp ? extended_timestamp : this.parserPacket.header.timestamp;
this.parserPacket.delta = 0;
} else {
this.parserPacket.delta = 0xFFFFFF === this.parserPacket.header.timestamp ? extended_timestamp : this.parserPacket.header.timestamp;
}
this.rtmpPacketAlloc();
}
this.parserState = RTMP_PARSE_PAYLOAD;
}
break;
case RTMP_PARSE_PAYLOAD:
size = Math.min(this.inChunkSize - (this.parserPacket.bytes % this.inChunkSize), this.parserPacket.header.length - this.parserPacket.bytes);
size = Math.min(size, bytes - offset);
if (size > 0) {
data.copy(this.parserPacket.payload, this.parserPacket.bytes, p + offset, p + offset + size);
}
this.parserPacket.bytes += size;
offset += size;
if (this.parserPacket.bytes >= this.parserPacket.header.length) {
this.parserState = RTMP_PARSE_INIT;
this.parserPacket.bytes = 0;
this.parserPacket.clock += this.parserPacket.delta;
this.rtmpHandler();
} else if (0 === (this.parserPacket.bytes % this.inChunkSize)) {
this.parserState = RTMP_PARSE_INIT;
}
break;
}
}
}
rtmpPacketParse() {
let fmt = this.parserBuffer[0] >> 6;
let cid = 0;
if (this.parserBasicBytes === 2) {
cid = 64 + this.parserBuffer[1];
} else if (this.parserBasicBytes === 3) {
cid = 64 + this.parserBuffer[1] + this.parserBuffer[2] << 8;
} else {
cid = this.parserBuffer[0] & 0x3F;
}
let hasp = this.inPackets.has(cid);
if (!hasp) {
this.parserPacket = RtmpPacket.create(fmt, cid);
this.inPackets.set(cid, this.parserPacket);
} else {
this.parserPacket = this.inPackets.get(cid);
}
this.parserPacket.header.fmt = fmt;
this.parserPacket.header.cid = cid;
this.rtmpChunkMessageHeaderRead();
// Logger.log(this.parserPacket);
}
rtmpChunkMessageHeaderRead() {
let offset = this.parserBasicBytes;
// timestamp / delta
if (this.parserPacket.header.fmt <= RTMP_CHUNK_TYPE_2) {
this.parserPacket.header.timestamp = this.parserBuffer.readUIntBE(offset, 3);
offset += 3;
}
// message length + type
if (this.parserPacket.header.fmt <= RTMP_CHUNK_TYPE_1) {
this.parserPacket.header.length = this.parserBuffer.readUIntBE(offset, 3);
this.parserPacket.header.type = this.parserBuffer[offset + 3];
offset += 4;
}
if (this.parserPacket.header.fmt === RTMP_CHUNK_TYPE_0) {
this.parserPacket.header.stream_id = this.parserBuffer.readUInt32LE(offset);
offset += 4;
}
return offset;
}
rtmpPacketAlloc() {
if (this.parserPacket.capacity < this.parserPacket.header.length) {
this.parserPacket.payload = Buffer.alloc(this.parserPacket.header.length + 1024);
this.parserPacket.capacity = this.parserPacket.header.length + 1024;
}
}
rtmpHandler() {
switch (this.parserPacket.header.type) {
case RTMP_TYPE_SET_CHUNK_SIZE:
case RTMP_TYPE_ABORT:
case RTMP_TYPE_ACKNOWLEDGEMENT:
case RTMP_TYPE_WINDOW_ACKNOWLEDGEMENT_SIZE:
case RTMP_TYPE_SET_PEER_BANDWIDTH:
return 0 === this.rtmpControlHandler() ? -1 : 0;
case RTMP_TYPE_EVENT:
return 0 === this.rtmpEventHandler() ? -1 : 0;
case RTMP_TYPE_AUDIO:
return this.rtmpAudioHandler();
case RTMP_TYPE_VIDEO:
return this.rtmpVideoHandler();
case RTMP_TYPE_FLEX_MESSAGE:
case RTMP_TYPE_INVOKE:
return this.rtmpInvokeHandler();
case RTMP_TYPE_FLEX_STREAM:// AMF3
case RTMP_TYPE_DATA: // AMF0
return this.rtmpDataHandler();
}
}
rtmpControlHandler() {
let payload = this.parserPacket.payload;
switch (this.parserPacket.header.type) {
case RTMP_TYPE_SET_CHUNK_SIZE:
this.inChunkSize = payload.readUInt32BE();
// Logger.debug('set inChunkSize', this.inChunkSize);
break;
case RTMP_TYPE_ABORT:
break;
case RTMP_TYPE_ACKNOWLEDGEMENT:
break;
case RTMP_TYPE_WINDOW_ACKNOWLEDGEMENT_SIZE:
this.ackSize = payload.readUInt32BE();
// Logger.debug('set ack Size', this.ackSize);
break;
case RTMP_TYPE_SET_PEER_BANDWIDTH:
break;
}
}
rtmpEventHandler() {
let payload = this.parserPacket.payload.slice(0, this.parserPacket.header.length);
let event = payload.readUInt16BE();
let value = payload.readUInt32BE(2);
// Logger.log('rtmpEventHandler', event, value);
switch (event) {
case 6:
this.rtmpSendPingResponse(value);
break;
}
}
rtmpInvokeHandler() {
let offset = this.parserPacket.header.type === RTMP_TYPE_FLEX_MESSAGE ? 1 : 0;
let payload = this.parserPacket.payload.slice(offset, this.parserPacket.header.length);
let invokeMessage = AMF.decodeAmf0Cmd(payload);
// Logger.log('rtmpInvokeHandler', invokeMessage);
switch (invokeMessage.cmd) {
case '_result':
this.rtmpCommandOnresult(invokeMessage);
break;
case '_error':
this.rtmpCommandOnerror(invokeMessage);
break;
case 'onStatus':
this.rtmpCommandOnstatus(invokeMessage);
break;
}
}
rtmpCommandOnresult(invokeMessage) {
// Logger.debug(invokeMessage);
switch (invokeMessage.transId) {
case RTMP_TRANSACTION_CONNECT:
this.launcher.emit('status', invokeMessage.info);
this.rtmpOnconnect();
break;
case RTMP_TRANSACTION_CREATE_STREAM:
this.rtmpOncreateStream(invokeMessage.info);
break;
}
}
rtmpCommandOnerror(invokeMessage) {
this.launcher.emit('status', invokeMessage.info);
}
rtmpCommandOnstatus(invokeMessage) {
this.launcher.emit('status', invokeMessage.info);
}
rtmpOnconnect() {
if (this.isPublish) {
this.rtmpSendReleaseStream();
this.rtmpSendFCPublish();
}
this.rtmpSendCreateStream();
}
rtmpOncreateStream(sid) {
this.streamId = sid;
if (this.isPublish) {
this.rtmpSendPublish();
this.rtmpSendSetChunkSize();
} else {
this.rtmpSendPlay();
this.rtmpSendSetBufferLength(1000);
}
}
rtmpAudioHandler() {
let payload = this.parserPacket.payload.slice(0, this.parserPacket.header.length);
this.launcher.emit('audio', payload, this.parserPacket.clock);
}
rtmpVideoHandler() {
let payload = this.parserPacket.payload.slice(0, this.parserPacket.header.length);
this.launcher.emit('video', payload, this.parserPacket.clock);
}
rtmpDataHandler() {
let payload = this.parserPacket.payload.slice(0, this.parserPacket.header.length);
this.launcher.emit('script', payload, this.parserPacket.clock);
}
sendInvokeMessage(sid, opt) {
let packet = RtmpPacket.create();
packet.header.fmt = RTMP_CHUNK_TYPE_0;
packet.header.cid = RTMP_CHANNEL_INVOKE;
packet.header.type = RTMP_TYPE_INVOKE;
packet.header.stream_id = sid;
packet.payload = AMF.encodeAmf0Cmd(opt);
packet.header.length = packet.payload.length;
let chunks = this.rtmpChunksCreate(packet);
this.socket.write(chunks);
}
rtmpSendConnect() {
let opt = {
cmd: 'connect',
transId: RTMP_TRANSACTION_CONNECT,
cmdObj: {
app: this.info.app,
flashVer: FLASHVER,
tcUrl: this.info.tcurl,
fpad: 0,
capabilities: 15,
audioCodecs: 3191,
videoCodecs: 252,
videoFunction: 1,
encoding: 0
}
}
this.sendInvokeMessage(0, opt);
}
rtmpSendReleaseStream() {
let opt = {
cmd: 'releaseStream',
transId: 0,
cmdObj: null,
streamName: this.info.stream,
};
this.sendInvokeMessage(this.streamId, opt);
}
rtmpSendFCPublish() {
let opt = {
cmd: 'FCPublish',
transId: 0,
cmdObj: null,
streamName: this.info.stream,
};
this.sendInvokeMessage(this.streamId, opt);
}
rtmpSendCreateStream() {
let opt = {
cmd: 'createStream',
transId: RTMP_TRANSACTION_CREATE_STREAM,
cmdObj: null
};
this.sendInvokeMessage(0, opt);
}
rtmpSendPlay() {
let opt = {
cmd: 'play',
transId: 0,
cmdObj: null,
streamName: this.info.stream,
start: -2,
duration: -1,
reset: 1
};
this.sendInvokeMessage(this.streamId, opt);
}
rtmpSendSetBufferLength(bufferTime) {
let packet = RtmpPacket.create();
packet.header.fmt = RTMP_CHUNK_TYPE_0;
packet.header.cid = RTMP_CHANNEL_PROTOCOL;
packet.header.type = RTMP_TYPE_EVENT;
packet.payload = Buffer.alloc(10);
packet.header.length = packet.payload.length;
packet.payload.writeUInt16BE(0x03);
packet.payload.writeUInt32BE(this.streamId, 2);
packet.payload.writeUInt32BE(bufferTime, 6);
let chunks = this.rtmpChunksCreate(packet);
this.socket.write(chunks);
}
rtmpSendPublish() {
let opt = {
cmd: 'publish',
transId: 0,
cmdObj: null,
streamName: this.info.stream,
type: 'live'
};
this.sendInvokeMessage(this.streamId, opt);
}
rtmpSendSetChunkSize() {
let rtmpBuffer = Buffer.from('02000000000004010000000000000000', 'hex');
rtmpBuffer.writeUInt32BE(this.inChunkSize, 12);
this.socket.write(rtmpBuffer);
this.outChunkSize = this.inChunkSize;
}
rtmpSendFCUnpublish() {
let opt = {
cmd: 'FCUnpublish',
transId: 0,
cmdObj: null,
streamName: this.info.stream,
};
this.sendInvokeMessage(this.streamId, opt);
}
rtmpSendDeleteStream() {
let opt = {
cmd: 'deleteStream',
transId: 0,
cmdObj: null,
streamId: this.streamId
};
this.sendInvokeMessage(this.streamId, opt);
}
rtmpSendPingResponse(time) {
let packet = RtmpPacket.create();
packet.header.fmt = RTMP_CHUNK_TYPE_0;
packet.header.cid = RTMP_CHANNEL_PROTOCOL;
packet.header.type = RTMP_TYPE_EVENT;
packet.payload = Buffer.alloc(6);
packet.header.length = packet.payload.length;
packet.payload.writeUInt16BE(0x07);
packet.payload.writeUInt32BE(time, 2);
let chunks = this.rtmpChunksCreate(packet);
this.socket.write(chunks);
}
}
module.exports = NodeRtmpClient
1
https://gitee.com/MQueenJ/Node-Media-Server.git
git@gitee.com:MQueenJ/Node-Media-Server.git
MQueenJ
Node-Media-Server
Node-Media-Server
master

搜索帮助