From 8ad7f70938892d82311ab3cecba0ed9a5b40fc02 Mon Sep 17 00:00:00 2001 From: eguid Date: Thu, 20 May 2021 16:00:44 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9C=AC=E7=89=88=E6=9C=AC=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E8=BD=AC=E5=B0=81=E8=A3=85=E5=92=8C=E8=BD=AC=E7=A0=81=EF=BC=8C?= =?UTF-8?q?=20=E5=8F=AF=E4=BB=A5=E6=A0=B9=E6=8D=AE=E8=A7=86=E9=A2=91?= =?UTF-8?q?=E6=BA=90=E7=9A=84=E9=9F=B3=E8=A7=86=E9=A2=91=E7=BC=96=E7=A0=81?= =?UTF-8?q?=E8=87=AA=E5=8A=A8=E5=88=A4=E6=96=AD=E6=98=AF=E8=BD=AC=E5=B0=81?= =?UTF-8?q?=E8=A3=85=E8=BF=98=E6=98=AF=E8=BD=AC=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/zj/controller/StreamController.java | 4 +- .../java/com/zj/service/MediaService.java | 15 +- .../com/zj/thread/MediaRecodeOrTransfer.java | 493 ++++++++++++++++++ 3 files changed, 504 insertions(+), 8 deletions(-) create mode 100644 src/main/java/com/zj/thread/MediaRecodeOrTransfer.java diff --git a/src/main/java/com/zj/controller/StreamController.java b/src/main/java/com/zj/controller/StreamController.java index 0a7f4fc..6e0cdf1 100644 --- a/src/main/java/com/zj/controller/StreamController.java +++ b/src/main/java/com/zj/controller/StreamController.java @@ -11,6 +11,7 @@ import com.zj.entity.Camera; import com.zj.service.CameraRepository; import com.zj.service.MediaService; import com.zj.thread.MediaConvert; +import com.zj.thread.MediaRecodeOrTransfer; import com.zj.vo.Result; import cn.hutool.crypto.digest.MD5; @@ -61,7 +62,8 @@ public class StreamController { Collection values = CameraRepository.cameraMap.values(); for (Camera camera : values) { String digestHex = MD5.create().digestHex(camera.getUrl()); - MediaConvert mediaConvert = MediaService.cameras.get(digestHex); + MediaRecodeOrTransfer mediaConvert = MediaService.cameras.get(digestHex); +// MediaConvert mediaConvert = MediaService.cameras.get(digestHex); if(mediaConvert != null) { camera.setStatus(mediaConvert.isRuning()); } else { diff --git a/src/main/java/com/zj/service/MediaService.java b/src/main/java/com/zj/service/MediaService.java index 54f9634..da4b2ce 100644 --- a/src/main/java/com/zj/service/MediaService.java +++ b/src/main/java/com/zj/service/MediaService.java @@ -6,6 +6,7 @@ import org.springframework.stereotype.Service; import com.zj.entity.Camera; import com.zj.thread.MediaConvert; +import com.zj.thread.MediaRecodeOrTransfer; import cn.hutool.core.thread.ThreadUtil; import cn.hutool.crypto.digest.MD5; @@ -21,7 +22,7 @@ import io.netty.channel.ChannelHandlerContext; public class MediaService { // 缓存流转换线程 - public static ConcurrentHashMap cameras = new ConcurrentHashMap<>(); + public static ConcurrentHashMap cameras = new ConcurrentHashMap<>(); /** * @@ -33,11 +34,11 @@ public class MediaService { String mediaKey = MD5.create().digestHex(camera.getUrl()); if (cameras.containsKey(mediaKey)) { - MediaConvert mediaConvert = cameras.get(mediaKey); + MediaRecodeOrTransfer mediaConvert = cameras.get(mediaKey); cameras.put(mediaKey, mediaConvert); mediaConvert.addHttpClient(ctx); } else { - MediaConvert mediaConvert = new MediaConvert(camera, autoClose); + MediaRecodeOrTransfer mediaConvert = new MediaRecodeOrTransfer(camera, autoClose); cameras.put(mediaKey, mediaConvert); ThreadUtil.execute(mediaConvert); mediaConvert.addHttpClient(ctx); @@ -50,11 +51,11 @@ public class MediaService { String mediaKey = MD5.create().digestHex(camera.getUrl()); if (cameras.containsKey(mediaKey)) { - MediaConvert mediaConvert = cameras.get(mediaKey); + MediaRecodeOrTransfer mediaConvert = cameras.get(mediaKey); cameras.put(mediaKey, mediaConvert); mediaConvert.addWsClient(ctx); } else { - MediaConvert mediaConvert = new MediaConvert(camera, autoClose); + MediaRecodeOrTransfer mediaConvert = new MediaRecodeOrTransfer(camera, autoClose); cameras.put(mediaKey, mediaConvert); ThreadUtil.execute(mediaConvert); mediaConvert.addWsClient(ctx); @@ -71,7 +72,7 @@ public class MediaService { String mediaKey = MD5.create().digestHex(camera.getUrl()); if (!cameras.containsKey(mediaKey)) { - MediaConvert mediaConvert = new MediaConvert(camera, false); + MediaRecodeOrTransfer mediaConvert = new MediaRecodeOrTransfer(camera, false); cameras.put(mediaKey, mediaConvert); ThreadUtil.execute(mediaConvert); } @@ -87,7 +88,7 @@ public class MediaService { String mediaKey = MD5.create().digestHex(camera.getUrl()); if (cameras.containsKey(mediaKey)) { - MediaConvert mediaConvert = cameras.get(mediaKey); + MediaRecodeOrTransfer mediaConvert = cameras.get(mediaKey); mediaConvert.setRuning(false); cameras.remove(mediaKey); } diff --git a/src/main/java/com/zj/thread/MediaRecodeOrTransfer.java b/src/main/java/com/zj/thread/MediaRecodeOrTransfer.java new file mode 100644 index 0000000..2e05b27 --- /dev/null +++ b/src/main/java/com/zj/thread/MediaRecodeOrTransfer.java @@ -0,0 +1,493 @@ +package com.zj.thread; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; + +import org.bytedeco.ffmpeg.avcodec.AVPacket; +import org.bytedeco.ffmpeg.global.avcodec; +import org.bytedeco.ffmpeg.global.avutil; +import org.bytedeco.javacv.FFmpegFrameGrabber; +import org.bytedeco.javacv.FFmpegFrameRecorder; +import org.bytedeco.javacv.FFmpegLogCallback; +import org.bytedeco.javacv.Frame; +import org.bytedeco.javacv.FrameGrabber.Exception; + +import com.zj.entity.Camera; +import com.zj.service.MediaService; + +import cn.hutool.crypto.digest.MD5; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; +import lombok.extern.slf4j.Slf4j; + +/** + * 支持转复用或转码线程 + * 什么情况下会转复用? + *

视频源的音视频编码必须是浏览器和flv规范两者同时支持的编码,比如H264/AAC,

+ *

否则将进行转码。

+ *

不支持hevc、vvc、vp8、vp9、g711、g771a等编码

+ * @author eguid + */ +@Slf4j +public class MediaRecodeOrTransfer extends Thread { + static { + avutil.av_log_set_level(avutil.AV_LOG_ERROR); + FFmpegLogCallback.set(); + } + + /** + * ws客户端 + */ + private ConcurrentHashMap wsClients = new ConcurrentHashMap<>(); + /** + * http客户端 + */ + private ConcurrentHashMap httpClients = new ConcurrentHashMap<>(); + + /** + * 运行状态 + */ + private boolean runing = false; + + private boolean grabberStatus = false; + + private boolean recorderStatus = false; + + /** + * 是否可以自动关闭流 + */ + private boolean autoClose = true; + + private int hcSize, wcSize = 0; + + /** + * 没有客户端计数 + */ + private int noClient = 0; + + /** + * flv header + */ + private byte[] header = null; + // 输出流,视频最终会输出到此 + private ByteArrayOutputStream bos = new ByteArrayOutputStream(); + + FFmpegFrameGrabber grabber;//拉流器 + FFmpegFrameRecorder recorder;//推流录制器 + + /** + * true:转复用,false:转码 + */ + boolean transferFlag=false;//默认转码 + + /** + * 相机 + */ + private Camera camera; + + /** + * @param camera + * @param auto 流是否可以自动关闭 + */ + public MediaRecodeOrTransfer(Camera camera, boolean autoClose) { + super(); + this.autoClose = autoClose; + this.camera = camera; + } + + public boolean isRuning() { + return runing; + } + + public void setRuning(boolean runing) { + this.runing = runing; + } + + /** + * 创建拉流器 + * @return + */ + protected boolean createGrabber() { + // 拉流器 + grabber = new FFmpegFrameGrabber(camera.getUrl()); + // 超时时间(15秒) + grabber.setOption("stimoout", "15000000"); + grabber.setOption("threads", "1"); + grabber.setPixelFormat(avutil.AV_PIX_FMT_YUV420P); + // 设置缓存大小,提高画质、减少卡顿花屏 + grabber.setOption("buffer_size", "1024000"); + // 如果为rtsp流,增加配置 + if ("rtsp".equals(camera.getUrl().substring(0, 4))) { + // 设置打开协议tcp / udp + grabber.setOption("rtsp_transport", "tcp"); + //首选TCP进行RTP传输 + grabber.setOption("rtsp_flags", "prefer_tcp"); + //设置超时时间 + grabber.setOption("stimeout","3000000"); + } + + try { + grabber.start(); + return grabberStatus = true; + } catch (Exception e) { + e.printStackTrace(); + } + return grabberStatus = false; + } + + /** + * 创建转码推流录制器 + * @return + */ + protected boolean createTransterOrRecodeRecorder() { + recorder = new FFmpegFrameRecorder(bos, grabber.getImageWidth(), grabber.getImageHeight(),grabber.getAudioChannels()); + recorder.setFormat("flv"); + if(!transferFlag) { + //转码 + recorder.setInterleaved(false); + recorder.setVideoOption("tune", "zerolatency"); + recorder.setVideoOption("preset", "ultrafast"); + recorder.setVideoOption("crf", "26"); + recorder.setVideoOption("threads", "1"); + recorder.setFrameRate(25);// 设置帧率 + recorder.setGopSize(25);// 设置gop,与帧率相同,相当于间隔1秒chan's一个关键帧 +// recorder.setVideoBitrate(500 * 1000);// 码率500kb/s + recorder.setVideoCodecName("libx264"); +// recorder.setVideoCodec(avcodec.AV_CODEC_ID_H264); + recorder.setPixelFormat(avutil.AV_PIX_FMT_YUV420P); +// recorder.setAudioCodec(avcodec.AV_CODEC_ID_AAC); + recorder.setAudioCodecName("aac"); + try { + recorder.start(); + return recorderStatus=true; + } catch (org.bytedeco.javacv.FrameRecorder.Exception e1) { + log.info("启动转码录制器失败", e1); + e1.printStackTrace(); + } + }else { + //转复用 + try { + recorder.start(grabber.getFormatContext()); + return recorderStatus=true; + } catch (org.bytedeco.javacv.FrameRecorder.Exception e1) { + log.info("启动转复用录制器失败", e1); + e1.printStackTrace(); + } + } + return recorderStatus=false; + } + + /** + * 是否支持flv的音视频编码 + * @return + */ + private boolean supportFlvFormatCodec() { + int vcodec=grabber.getVideoCodec(); + int acodec=grabber.getAudioCodec(); + return (avcodec.AV_CODEC_ID_H264==vcodec||avcodec.AV_CODEC_ID_H263==vcodec)&&(avcodec.AV_CODEC_ID_AAC==acodec||avcodec.AV_CODEC_ID_AAC_LATM==acodec); + } + + /** + * 将视频源转换为flv + */ + protected void transferStream2Flv() { + if(!createGrabber()) { + return; + } + transferFlag = supportFlvFormatCodec(); + if(!createTransterOrRecodeRecorder()) { + return; + } + + try { + grabber.flush(); + } catch (Exception e) { + log.info("清空拉流器缓存失败", e); + e.printStackTrace(); + } + if (header == null) { + header = bos.toByteArray(); +// System.out.println(HexUtil.encodeHexStr(header)); + bos.reset(); + } + + runing = true; + //时间戳计算 + long startTime = 0; + long videoTS = 0; + long lastTime=0; + //累积延迟计算 + long latencyDifference=0;//延迟差值 + long maxLatencyThreshold=3000;//最大延迟阈值,如果lastLatencyDifference-latencyDifference>maxLatencyThreshold,则重启拉流器 + long lastLatencyDifference=0;//当前最新延迟差值, + + long processTime=0;//上一帧处理耗时,用于延迟时间补偿,处理耗时不算进累积延迟 + for(;runing && grabberStatus && recorderStatus;) { + + lastTime=System.currentTimeMillis(); + //累积延迟过大,则重新建立连接 + if (lastLatencyDifference-latencyDifference>maxLatencyThreshold) { + try { + grabber.restart(); // grabber.grabFrame() avformat + grabber.flush(); + log.warn("\r\n{}\r\n重连成功》》》", camera.getUrl()); + } catch (Exception e) { + log.warn("\r\n{}\r\n重连失败!", camera.getUrl()); + //跳出循环,销毁拉流器和录制器 + break; + } + } + + hasClient(); + + try { + if(transferFlag) { + log.error("转复用流程"); + //转复用 + AVPacket pkt = grabber.grabPacket(); + if (null!=pkt&&!pkt.isNull()) { + if (startTime == 0) { + startTime = System.currentTimeMillis(); + } + videoTS = 1000 * (System.currentTimeMillis() - startTime); + // 判断时间偏移 + if (videoTS > recorder.getTimestamp()) { + //System.out.println("矫正时间戳: " + videoTS + " : " + recorder.getTimestamp() + " -> " + //+ (videoTS - recorder.getTimestamp())); + recorder.setTimestamp((videoTS)); + } + recorder.recordPacket(pkt); + } + }else { + log.error("转码流程"); + //转码 + Frame frame = grabber.grabFrame(); + if (frame != null) { + if (startTime == 0) { + startTime = System.currentTimeMillis(); + } + videoTS = 1000 * (System.currentTimeMillis() - startTime); + // 判断时间偏移 + if (videoTS > recorder.getTimestamp()) { + //System.out.println("矫正时间戳: " + videoTS + " : " + recorder.getTimestamp() + " -> " + //+ (videoTS - recorder.getTimestamp())); + recorder.setTimestamp((videoTS)); + } + recorder.record(frame); + } + } + } catch (Exception e) { + //log.info("\r\n{}\r\n尝试重连。。。", camera.getUrl()); + try { + Thread.sleep(500); + } catch (InterruptedException e1) { + } + //e.printStackTrace(); + } catch (org.bytedeco.javacv.FrameRecorder.Exception e) { + //runing = false; + log.info("\r\n{}\r\n录制器出现异常。。。", camera.getUrl()); + e.printStackTrace(); + } + if (bos.size() > 0) { + byte[] b = bos.toByteArray(); + bos.reset(); + + // ws输出帧流 + for (Entry entry : wsClients.entrySet()) { + try { + if (entry.getValue().channel().isWritable()) { + entry.getValue().writeAndFlush(new BinaryWebSocketFrame(Unpooled.copiedBuffer(b))); + } else { + wsClients.remove(entry.getKey()); + hasClient(); + } + } catch (java.lang.Exception e) { + wsClients.remove(entry.getKey()); + hasClient(); + e.printStackTrace(); + } + } + + // http + for (Entry entry : httpClients.entrySet()) { + try { + if (entry.getValue().channel().isWritable()) { + entry.getValue().writeAndFlush(Unpooled.copiedBuffer(b)); + } else { + httpClients.remove(entry.getKey()); + hasClient(); + } + } catch (java.lang.Exception e) { + httpClients.remove(entry.getKey()); + hasClient(); + e.printStackTrace(); + } + } + //流程耗时记录 + if(lastTime>0) { + processTime=System.currentTimeMillis()-lastTime; + } + } + } + + // close包含stop和release方法。录制文件必须保证最后执行stop()方法 + try { + recorder.close(); + grabber.close(); + bos.close(); + } catch (org.bytedeco.javacv.FrameRecorder.Exception e) { + e.printStackTrace(); + } catch (Exception e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } finally { + runing = false; + } + log.info("关闭媒体流,{} ", camera.getUrl()); + } + + /** + * 新增ws客戶端 + * + * @param session + */ + public void addWsClient(ChannelHandlerContext ctx) { + int timeout = 0; + while (true) { + try { + if (runing) { + try { + if (ctx.channel().isWritable()) { + // 发送帧前先发送header + ChannelFuture future = ctx + .writeAndFlush(new BinaryWebSocketFrame(Unpooled.copiedBuffer(header))); + future.addListener(new GenericFutureListener>() { + @Override + public void operationComplete(Future future) throws Exception { + if (future.isSuccess()) { + wsClients.put(ctx.channel().id().toString(), ctx); + } + } + }); + } + + } catch (java.lang.Exception e) { + e.printStackTrace(); + } + break; + } + + // 等待推拉流启动 + Thread.sleep(100); + // 启动录制器失败 + timeout += 100; + if (timeout > 15000) { + break; + } + } catch (java.lang.Exception e) { + e.printStackTrace(); + } + } + } + + /** + * 关闭流 + * + * @return + */ + public void hasClient() { + + int newHcSize = httpClients.size(); + int newWcSize = wsClients.size(); + if (hcSize != newHcSize || wcSize != newWcSize) { + hcSize = newHcSize; + wcSize = newWcSize; + log.info("\r\n{}\r\nhttp连接数:{}, ws连接数:{} \r\n", camera.getUrl(), newHcSize, newWcSize); + } + + // 自动拉流无需关闭 + if (!autoClose) { + if (httpClients.isEmpty() && wsClients.isEmpty()) { + try { + Thread.sleep(5); // 不能太久 + } catch (InterruptedException e) { + } + } + return; + } + if (httpClients.isEmpty() && wsClients.isEmpty()) { + // 5*2000=10000=10,等待10秒还没有客户端,则关闭推流 + if (noClient > 2000) { + runing = false; + String mediaKey = MD5.create().digestHex(camera.getUrl()); + MediaService.cameras.remove(mediaKey); + + } else { + try { + Thread.sleep(5); + } catch (InterruptedException e) { + } + noClient += 1; + } + } else { + noClient = 0; + } + } + + /** + * 新增http客戶端 + * + * @param session + */ + public void addHttpClient(ChannelHandlerContext ctx) { + int timeout = 0; + while (true) { + try { + if (runing) { + try { + if (ctx.channel().isWritable()) { + // 发送帧前先发送header + ChannelFuture future = ctx.writeAndFlush(Unpooled.copiedBuffer(header)); + future.addListener(new GenericFutureListener>() { + @Override + public void operationComplete(Future future) throws Exception { + if (future.isSuccess()) { + httpClients.put(ctx.channel().id().toString(), ctx); + } + } + }); + } + + } catch (java.lang.Exception e) { + e.printStackTrace(); + } + break; + } + + // 等待推拉流启动 + Thread.sleep(100); + + // 启动录制器失败 + timeout += 100; + if (timeout > 15000) { + break; + } + } catch (java.lang.Exception e) { + e.printStackTrace(); + } + } + } + + @Override + public void run() { + transferStream2Flv(); + } + +} -- Gitee