Mirror of roytam1's UXP fork just in case Moonchild and Tobin decide to go after him
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

2373 lines
74 KiB

/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this file,
* You can obtain one at http://mozilla.org/MPL/2.0/. */
// Original author: ekr@rtfm.com
#include "MediaPipeline.h"
#ifndef USE_FAKE_MEDIA_STREAMS
#include "MediaStreamGraphImpl.h"
#endif
#include <math.h>
#include "nspr.h"
#include "srtp.h"
#if !defined(MOZILLA_EXTERNAL_LINKAGE)
#include "VideoSegment.h"
#include "Layers.h"
#include "LayersLogging.h"
#include "ImageTypes.h"
#include "ImageContainer.h"
#include "DOMMediaStream.h"
#include "MediaStreamTrack.h"
#include "MediaStreamListener.h"
#include "MediaStreamVideoSink.h"
#include "VideoUtils.h"
#include "VideoStreamTrack.h"
#ifdef WEBRTC_GONK
#include "GrallocImages.h"
#include "mozilla/layers/GrallocTextureClient.h"
#endif
#endif
#include "nsError.h"
#include "AudioSegment.h"
#include "MediaSegment.h"
#include "MediaPipelineFilter.h"
#include "databuffer.h"
#include "transportflow.h"
#include "transportlayer.h"
#include "transportlayerdtls.h"
#include "transportlayerice.h"
#include "runnable_utils.h"
#include "libyuv/convert.h"
#include "mozilla/SharedThreadPool.h"
#if !defined(MOZILLA_EXTERNAL_LINKAGE)
#include "mozilla/PeerIdentity.h"
#include "mozilla/TaskQueue.h"
#endif
#include "mozilla/gfx/Point.h"
#include "mozilla/gfx/Types.h"
#include "mozilla/UniquePtr.h"
#include "mozilla/UniquePtrExtensions.h"
#include "mozilla/Sprintf.h"
#include "webrtc/common_types.h"
#include "webrtc/common_video/interface/native_handle.h"
#include "webrtc/common_video/libyuv/include/webrtc_libyuv.h"
#include "webrtc/video_engine/include/vie_errors.h"
#include "logging.h"
// Max size given stereo is 480*2*2 = 1920 (10ms of 16-bits stereo audio at
// 48KHz)
#define AUDIO_SAMPLE_BUFFER_MAX 480*2*2
static_assert((WEBRTC_DEFAULT_SAMPLE_RATE/100)*sizeof(uint16_t) * 2
<= AUDIO_SAMPLE_BUFFER_MAX,
"AUDIO_SAMPLE_BUFFER_MAX is not large enough");
using namespace mozilla;
using namespace mozilla::dom;
using namespace mozilla::gfx;
using namespace mozilla::layers;
// Logging context
MOZ_MTLOG_MODULE("mediapipeline")
namespace mozilla {
#if !defined(MOZILLA_EXTERNAL_LINKAGE)
class VideoConverterListener
{
public:
NS_INLINE_DECL_THREADSAFE_REFCOUNTING(VideoConverterListener)
virtual void OnVideoFrameConverted(unsigned char* aVideoFrame,
unsigned int aVideoFrameLength,
unsigned short aWidth,
unsigned short aHeight,
VideoType aVideoType,
uint64_t aCaptureTime) = 0;
virtual void OnVideoFrameConverted(webrtc::I420VideoFrame& aVideoFrame) = 0;
protected:
virtual ~VideoConverterListener() {}
};
// I420 buffer size macros
#define YSIZE(x,y) ((x)*(y))
#define CRSIZE(x,y) ((((x)+1) >> 1) * (((y)+1) >> 1))
#define I420SIZE(x,y) (YSIZE((x),(y)) + 2 * CRSIZE((x),(y)))
// An async video frame format converter.
//
// Input is typically a MediaStream(Track)Listener driven by MediaStreamGraph.
//
// We keep track of the size of the TaskQueue so we can drop frames if
// conversion is taking too long.
//
// Output is passed through to all added VideoConverterListeners on a TaskQueue
// thread whenever a frame is converted.
class VideoFrameConverter
{
public:
NS_INLINE_DECL_THREADSAFE_REFCOUNTING(VideoFrameConverter)
VideoFrameConverter()
: mLength(0)
, last_img_(-1) // -1 is not a guaranteed invalid serial. See bug 1262134.
, disabled_frame_sent_(false)
#ifdef DEBUG
, mThrottleCount(0)
, mThrottleRecord(0)
#endif
, mMutex("VideoFrameConverter")
{
MOZ_COUNT_CTOR(VideoFrameConverter);
RefPtr<SharedThreadPool> pool =
SharedThreadPool::Get(NS_LITERAL_CSTRING("VideoFrameConverter"));
mTaskQueue = MakeAndAddRef<TaskQueue>(pool.forget());
}
void QueueVideoChunk(VideoChunk& aChunk, bool aForceBlack)
{
if (aChunk.IsNull()) {
return;
}
// We get passed duplicate frames every ~10ms even with no frame change.
int32_t serial = aChunk.mFrame.GetImage()->GetSerial();
if (serial == last_img_) {
return;
}
last_img_ = serial;
// A throttling limit of 1 allows us to convert 2 frames concurrently.
// It's short enough to not build up too significant a delay, while
// giving us a margin to not cause some machines to drop every other frame.
const int32_t queueThrottlingLimit = 1;
if (mLength > queueThrottlingLimit) {
MOZ_MTLOG(ML_DEBUG, "VideoFrameConverter " << this << " queue is full." <<
" Throttling by throwing away a frame.");
#ifdef DEBUG
++mThrottleCount;
mThrottleRecord = std::max(mThrottleCount, mThrottleRecord);
#endif
return;
}
#ifdef DEBUG
if (mThrottleCount > 0) {
auto level = ML_DEBUG;
if (mThrottleCount > 5) {
// Log at a higher level when we have large drops.
level = ML_INFO;
}
MOZ_MTLOG(level, "VideoFrameConverter " << this << " stopped" <<
" throttling after throwing away " << mThrottleCount <<
" frames. Longest throttle so far was " <<
mThrottleRecord << " frames.");
mThrottleCount = 0;
}
#endif
bool forceBlack = aForceBlack || aChunk.mFrame.GetForceBlack();
if (forceBlack) {
// Reset the last-img check.
// -1 is not a guaranteed invalid serial. See bug 1262134.
last_img_ = -1;
if (disabled_frame_sent_) {
// After disabling we just pass one black frame to the encoder.
// Allocating and setting it to black steals some performance
// that can be avoided. We don't handle resolution changes while
// disabled for now.
return;
}
disabled_frame_sent_ = true;
} else {
disabled_frame_sent_ = false;
}
++mLength; // Atomic
nsCOMPtr<nsIRunnable> runnable =
NewRunnableMethod<StorensRefPtrPassByPtr<Image>, bool>(
this, &VideoFrameConverter::ProcessVideoFrame,
aChunk.mFrame.GetImage(), forceBlack);
mTaskQueue->Dispatch(runnable.forget());
}
void AddListener(VideoConverterListener* aListener)
{
MutexAutoLock lock(mMutex);
MOZ_ASSERT(!mListeners.Contains(aListener));
mListeners.AppendElement(aListener);
}
bool RemoveListener(VideoConverterListener* aListener)
{
MutexAutoLock lock(mMutex);
return mListeners.RemoveElement(aListener);
}
void Shutdown()
{
mTaskQueue->BeginShutdown();
mTaskQueue->AwaitShutdownAndIdle();
}
protected:
virtual ~VideoFrameConverter()
{
MOZ_COUNT_DTOR(VideoFrameConverter);
}
void VideoFrameConverted(unsigned char* aVideoFrame,
unsigned int aVideoFrameLength,
unsigned short aWidth,
unsigned short aHeight,
VideoType aVideoType,
uint64_t aCaptureTime)
{
MutexAutoLock lock(mMutex);
for (RefPtr<VideoConverterListener>& listener : mListeners) {
listener->OnVideoFrameConverted(aVideoFrame, aVideoFrameLength,
aWidth, aHeight, aVideoType, aCaptureTime);
}
}
void VideoFrameConverted(webrtc::I420VideoFrame& aVideoFrame)
{
MutexAutoLock lock(mMutex);
for (RefPtr<VideoConverterListener>& listener : mListeners) {
listener->OnVideoFrameConverted(aVideoFrame);
}
}
void ProcessVideoFrame(Image* aImage, bool aForceBlack)
{
--mLength; // Atomic
MOZ_ASSERT(mLength >= 0);
if (aForceBlack) {
IntSize size = aImage->GetSize();
uint32_t yPlaneLen = YSIZE(size.width, size.height);
uint32_t cbcrPlaneLen = 2 * CRSIZE(size.width, size.height);
uint32_t length = yPlaneLen + cbcrPlaneLen;
// Send a black image.
auto pixelData = MakeUniqueFallible<uint8_t[]>(length);
if (pixelData) {
// YCrCb black = 0x10 0x80 0x80
memset(pixelData.get(), 0x10, yPlaneLen);
// Fill Cb/Cr planes
memset(pixelData.get() + yPlaneLen, 0x80, cbcrPlaneLen);
MOZ_MTLOG(ML_DEBUG, "Sending a black video frame");
VideoFrameConverted(pixelData.get(), length, size.width, size.height,
mozilla::kVideoI420, 0);
}
return;
}
ImageFormat format = aImage->GetFormat();
#ifdef WEBRTC_GONK
GrallocImage* nativeImage = aImage->AsGrallocImage();
if (nativeImage) {
android::sp<android::GraphicBuffer> graphicBuffer = nativeImage->GetGraphicBuffer();
int pixelFormat = graphicBuffer->getPixelFormat(); /* PixelFormat is an enum == int */
mozilla::VideoType destFormat;
switch (pixelFormat) {
case HAL_PIXEL_FORMAT_YV12:
// all android must support this
destFormat = mozilla::kVideoYV12;
break;
case GrallocImage::HAL_PIXEL_FORMAT_YCbCr_420_SP:
destFormat = mozilla::kVideoNV21;
break;
case GrallocImage::HAL_PIXEL_FORMAT_YCbCr_420_P:
destFormat = mozilla::kVideoI420;
break;
default:
// XXX Bug NNNNNNN
// use http://dxr.mozilla.org/mozilla-central/source/content/media/omx/I420ColorConverterHelper.cpp
// to convert unknown types (OEM-specific) to I420
MOZ_MTLOG(ML_ERROR, "Un-handled GRALLOC buffer type:" << pixelFormat);
MOZ_CRASH();
}
void *basePtr;
graphicBuffer->lock(android::GraphicBuffer::USAGE_SW_READ_MASK, &basePtr);
uint32_t width = graphicBuffer->getWidth();
uint32_t height = graphicBuffer->getHeight();
// XXX gralloc buffer's width and stride could be different depends on implementations.
if (destFormat != mozilla::kVideoI420) {
unsigned char *video_frame = static_cast<unsigned char*>(basePtr);
webrtc::I420VideoFrame i420_frame;
int stride_y = width;
int stride_uv = (width + 1) / 2;
int target_width = width;
int target_height = height;
if (i420_frame.CreateEmptyFrame(target_width,
abs(target_height),
stride_y,
stride_uv, stride_uv) < 0) {
MOZ_ASSERT(false, "Can't allocate empty i420frame");
return;
}
webrtc::VideoType commonVideoType =
webrtc::RawVideoTypeToCommonVideoVideoType(
static_cast<webrtc::RawVideoType>((int)destFormat));
if (ConvertToI420(commonVideoType, video_frame, 0, 0, width, height,
I420SIZE(width, height), webrtc::kVideoRotation_0,
&i420_frame)) {
MOZ_ASSERT(false, "Can't convert video type for sending to I420");
return;
}
i420_frame.set_ntp_time_ms(0);
VideoFrameConverted(i420_frame);
} else {
VideoFrameConverted(static_cast<unsigned char*>(basePtr),
I420SIZE(width, height),
width,
height,
destFormat, 0);
}
graphicBuffer->unlock();
return;
} else
#endif
if (format == ImageFormat::PLANAR_YCBCR) {
// Cast away constness b/c some of the accessors are non-const
PlanarYCbCrImage* yuv = const_cast<PlanarYCbCrImage *>(
static_cast<const PlanarYCbCrImage *>(aImage));
const PlanarYCbCrData *data = yuv->GetData();
if (data) {
uint8_t *y = data->mYChannel;
uint8_t *cb = data->mCbChannel;
uint8_t *cr = data->mCrChannel;
int32_t yStride = data->mYStride;
int32_t cbCrStride = data->mCbCrStride;
uint32_t width = yuv->GetSize().width;
uint32_t height = yuv->GetSize().height;
webrtc::I420VideoFrame i420_frame;
int rv = i420_frame.CreateFrame(y, cb, cr, width, height,
yStride, cbCrStride, cbCrStride,
webrtc::kVideoRotation_0);
if (rv != 0) {
NS_ERROR("Creating an I420 frame failed");
return;
}
MOZ_MTLOG(ML_DEBUG, "Sending an I420 video frame");
VideoFrameConverted(i420_frame);
return;
}
}
RefPtr<SourceSurface> surf = aImage->GetAsSourceSurface();
if (!surf) {
MOZ_MTLOG(ML_ERROR, "Getting surface from " << Stringify(format) << " image failed");
return;
}
RefPtr<DataSourceSurface> data = surf->GetDataSurface();
if (!data) {
MOZ_MTLOG(ML_ERROR, "Getting data surface from " << Stringify(format)
<< " image with " << Stringify(surf->GetType()) << "("
<< Stringify(surf->GetFormat()) << ") surface failed");
return;
}
IntSize size = aImage->GetSize();
int half_width = (size.width + 1) >> 1;
int half_height = (size.height + 1) >> 1;
int c_size = half_width * half_height;
int buffer_size = YSIZE(size.width, size.height) + 2 * c_size;
auto yuv_scoped = MakeUniqueFallible<uint8[]>(buffer_size);
if (!yuv_scoped) {
return;
}
uint8* yuv = yuv_scoped.get();
DataSourceSurface::ScopedMap map(data, DataSourceSurface::READ);
if (!map.IsMapped()) {
MOZ_MTLOG(ML_ERROR, "Reading DataSourceSurface from " << Stringify(format)
<< " image with " << Stringify(surf->GetType()) << "("
<< Stringify(surf->GetFormat()) << ") surface failed");
return;
}
int rv;
int cb_offset = YSIZE(size.width, size.height);
int cr_offset = cb_offset + c_size;
switch (surf->GetFormat()) {
case SurfaceFormat::B8G8R8A8:
case SurfaceFormat::B8G8R8X8:
rv = libyuv::ARGBToI420(static_cast<uint8*>(map.GetData()),
map.GetStride(),
yuv, size.width,
yuv + cb_offset, half_width,
yuv + cr_offset, half_width,
size.width, size.height);
break;
case SurfaceFormat::R5G6B5_UINT16:
rv = libyuv::RGB565ToI420(static_cast<uint8*>(map.GetData()),
map.GetStride(),
yuv, size.width,
yuv + cb_offset, half_width,
yuv + cr_offset, half_width,
size.width, size.height);
break;
default:
MOZ_MTLOG(ML_ERROR, "Unsupported RGB video format" << Stringify(surf->GetFormat()));
MOZ_ASSERT(PR_FALSE);
return;
}
if (rv != 0) {
MOZ_MTLOG(ML_ERROR, Stringify(surf->GetFormat()) << " to I420 conversion failed");
return;
}
MOZ_MTLOG(ML_DEBUG, "Sending an I420 video frame converted from " <<
Stringify(surf->GetFormat()));
VideoFrameConverted(yuv, buffer_size, size.width, size.height, mozilla::kVideoI420, 0);
}
Atomic<int32_t, Relaxed> mLength;
RefPtr<TaskQueue> mTaskQueue;
// Written and read from the queueing thread (normally MSG).
int32_t last_img_; // serial number of last Image
bool disabled_frame_sent_; // If a black frame has been sent after disabling.
#ifdef DEBUG
uint32_t mThrottleCount;
uint32_t mThrottleRecord;
#endif
// mMutex guards the below variables.
Mutex mMutex;
nsTArray<RefPtr<VideoConverterListener>> mListeners;
};
#endif
// An async inserter for audio data, to avoid running audio codec encoders
// on the MSG/input audio thread. Basically just bounces all the audio
// data to a single audio processing/input queue. We could if we wanted to
// use multiple threads and a TaskQueue.
class AudioProxyThread
{
public:
NS_INLINE_DECL_THREADSAFE_REFCOUNTING(AudioProxyThread)
explicit AudioProxyThread(AudioSessionConduit *aConduit)
: mConduit(aConduit)
{
MOZ_ASSERT(mConduit);
MOZ_COUNT_CTOR(AudioProxyThread);
#if !defined(MOZILLA_EXTERNAL_LINKAGE)
// Use only 1 thread; also forces FIFO operation
// We could use multiple threads, but that may be dicier with the webrtc.org
// code. If so we'd need to use TaskQueues like the videoframe converter
RefPtr<SharedThreadPool> pool =
SharedThreadPool::Get(NS_LITERAL_CSTRING("AudioProxy"), 1);
mThread = pool.get();
#else
nsCOMPtr<nsIThread> thread;
if (!NS_WARN_IF(NS_FAILED(NS_NewNamedThread("AudioProxy", getter_AddRefs(thread))))) {
mThread = thread;
}
#endif
}
// called on mThread
void InternalProcessAudioChunk(
TrackRate rate,
AudioChunk& chunk,
bool enabled) {
// Convert to interleaved, 16-bits integer audio, with a maximum of two
// channels (since the WebRTC.org code below makes the assumption that the
// input audio is either mono or stereo).
uint32_t outputChannels = chunk.ChannelCount() == 1 ? 1 : 2;
const int16_t* samples = nullptr;
UniquePtr<int16_t[]> convertedSamples;
// We take advantage of the fact that the common case (microphone directly to
// PeerConnection, that is, a normal call), the samples are already 16-bits
// mono, so the representation in interleaved and planar is the same, and we
// can just use that.
if (enabled && outputChannels == 1 && chunk.mBufferFormat == AUDIO_FORMAT_S16) {
samples = chunk.ChannelData<int16_t>().Elements()[0];
} else {
convertedSamples = MakeUnique<int16_t[]>(chunk.mDuration * outputChannels);
if (!enabled || chunk.mBufferFormat == AUDIO_FORMAT_SILENCE) {
PodZero(convertedSamples.get(), chunk.mDuration * outputChannels);
} else if (chunk.mBufferFormat == AUDIO_FORMAT_FLOAT32) {
DownmixAndInterleave(chunk.ChannelData<float>(),
chunk.mDuration, chunk.mVolume, outputChannels,
convertedSamples.get());
} else if (chunk.mBufferFormat == AUDIO_FORMAT_S16) {
DownmixAndInterleave(chunk.ChannelData<int16_t>(),
chunk.mDuration, chunk.mVolume, outputChannels,
convertedSamples.get());
}
samples = convertedSamples.get();
}
MOZ_ASSERT(!(rate%100)); // rate should be a multiple of 100
// Check if the rate or the number of channels has changed since the last time
// we came through. I realize it may be overkill to check if the rate has
// changed, but I believe it is possible (e.g. if we change sources) and it
// costs us very little to handle this case.
uint32_t audio_10ms = rate / 100;
if (!packetizer_ ||
packetizer_->PacketSize() != audio_10ms ||
packetizer_->Channels() != outputChannels) {
// It's ok to drop the audio still in the packetizer here.
packetizer_ = new AudioPacketizer<int16_t, int16_t>(audio_10ms, outputChannels);
}
packetizer_->Input(samples, chunk.mDuration);
while (packetizer_->PacketsAvailable()) {
uint32_t samplesPerPacket = packetizer_->PacketSize() *
packetizer_->Channels();
// We know that webrtc.org's code going to copy the samples down the line,
// so we can just use a stack buffer here instead of malloc-ing.
int16_t packet[AUDIO_SAMPLE_BUFFER_MAX];
packetizer_->Output(packet);
mConduit->SendAudioFrame(packet, samplesPerPacket, rate, 0);
}
}
void QueueAudioChunk(TrackRate rate, AudioChunk& chunk, bool enabled)
{
RUN_ON_THREAD(mThread,
WrapRunnable(RefPtr<AudioProxyThread>(this),
&AudioProxyThread::InternalProcessAudioChunk,
rate, chunk, enabled),
NS_DISPATCH_NORMAL);
}
protected:
virtual ~AudioProxyThread()
{
// Conduits must be released on MainThread, and we might have the last reference
// We don't need to worry about runnables still trying to access the conduit, since
// the runnables hold a ref to AudioProxyThread.
NS_ReleaseOnMainThread(mConduit.forget());
MOZ_COUNT_DTOR(AudioProxyThread);
}
RefPtr<AudioSessionConduit> mConduit;
nsCOMPtr<nsIEventTarget> mThread;
// Only accessed on mThread
nsAutoPtr<AudioPacketizer<int16_t, int16_t>> packetizer_;
};
static char kDTLSExporterLabel[] = "EXTRACTOR-dtls_srtp";
MediaPipeline::MediaPipeline(const std::string& pc,
Direction direction,
nsCOMPtr<nsIEventTarget> main_thread,
nsCOMPtr<nsIEventTarget> sts_thread,
const std::string& track_id,
int level,
RefPtr<MediaSessionConduit> conduit,
RefPtr<TransportFlow> rtp_transport,
RefPtr<TransportFlow> rtcp_transport,
nsAutoPtr<MediaPipelineFilter> filter)
: direction_(direction),
track_id_(track_id),
level_(level),
conduit_(conduit),
rtp_(rtp_transport, rtcp_transport ? RTP : MUX),
rtcp_(rtcp_transport ? rtcp_transport : rtp_transport,
rtcp_transport ? RTCP : MUX),
main_thread_(main_thread),
sts_thread_(sts_thread),
rtp_packets_sent_(0),
rtcp_packets_sent_(0),
rtp_packets_received_(0),
rtcp_packets_received_(0),
rtp_bytes_sent_(0),
rtp_bytes_received_(0),
pc_(pc),
description_(),
filter_(filter),
rtp_parser_(webrtc::RtpHeaderParser::Create()) {
// To indicate rtcp-mux rtcp_transport should be nullptr.
// Therefore it's an error to send in the same flow for
// both rtp and rtcp.
MOZ_ASSERT(rtp_transport != rtcp_transport);
// PipelineTransport() will access this->sts_thread_; moved here for safety
transport_ = new PipelineTransport(this);
}
MediaPipeline::~MediaPipeline() {
ASSERT_ON_THREAD(main_thread_);
MOZ_MTLOG(ML_INFO, "Destroying MediaPipeline: " << description_);
}
nsresult MediaPipeline::Init() {
ASSERT_ON_THREAD(main_thread_);
if (direction_ == RECEIVE) {
conduit_->SetReceiverTransport(transport_);
} else {
conduit_->SetTransmitterTransport(transport_);
}
RUN_ON_THREAD(sts_thread_,
WrapRunnable(
RefPtr<MediaPipeline>(this),
&MediaPipeline::Init_s),
NS_DISPATCH_NORMAL);
return NS_OK;
}
nsresult MediaPipeline::Init_s() {
ASSERT_ON_THREAD(sts_thread_);
return AttachTransport_s();
}
// Disconnect us from the transport so that we can cleanly destruct the
// pipeline on the main thread. ShutdownMedia_m() must have already been
// called
void
MediaPipeline::DetachTransport_s()
{
ASSERT_ON_THREAD(sts_thread_);
disconnect_all();
transport_->Detach();
rtp_.Detach();
rtcp_.Detach();
}
nsresult
MediaPipeline::AttachTransport_s()
{
ASSERT_ON_THREAD(sts_thread_);
nsresult res;
MOZ_ASSERT(rtp_.transport_);
MOZ_ASSERT(rtcp_.transport_);
res = ConnectTransport_s(rtp_);
if (NS_FAILED(res)) {
return res;
}
if (rtcp_.transport_ != rtp_.transport_) {
res = ConnectTransport_s(rtcp_);
if (NS_FAILED(res)) {
return res;
}
}
transport_->Attach(this);
return NS_OK;
}
void
MediaPipeline::UpdateTransport_m(int level,
RefPtr<TransportFlow> rtp_transport,
RefPtr<TransportFlow> rtcp_transport,
nsAutoPtr<MediaPipelineFilter> filter)
{
RUN_ON_THREAD(sts_thread_,
WrapRunnable(
this,
&MediaPipeline::UpdateTransport_s,
level,
rtp_transport,
rtcp_transport,
filter),
NS_DISPATCH_NORMAL);
}
void
MediaPipeline::UpdateTransport_s(int level,
RefPtr<TransportFlow> rtp_transport,
RefPtr<TransportFlow> rtcp_transport,
nsAutoPtr<MediaPipelineFilter> filter)
{
bool rtcp_mux = false;
if (!rtcp_transport) {
rtcp_transport = rtp_transport;
rtcp_mux = true;
}
if ((rtp_transport != rtp_.transport_) ||
(rtcp_transport != rtcp_.transport_)) {
DetachTransport_s();
rtp_ = TransportInfo(rtp_transport, rtcp_mux ? MUX : RTP);
rtcp_ = TransportInfo(rtcp_transport, rtcp_mux ? MUX : RTCP);
AttachTransport_s();
}
level_ = level;
if (filter_ && filter) {
// Use the new filter, but don't forget any remote SSRCs that we've learned
// by receiving traffic.
filter_->Update(*filter);
} else {
filter_ = filter;
}
}
void
MediaPipeline::SelectSsrc_m(size_t ssrc_index)
{
RUN_ON_THREAD(sts_thread_,
WrapRunnable(
this,
&MediaPipeline::SelectSsrc_s,
ssrc_index),
NS_DISPATCH_NORMAL);
}
void
MediaPipeline::SelectSsrc_s(size_t ssrc_index)
{
filter_ = new MediaPipelineFilter;
if (ssrc_index < ssrcs_received_.size()) {
filter_->AddRemoteSSRC(ssrcs_received_[ssrc_index]);
} else {
MOZ_MTLOG(ML_WARNING, "SelectSsrc called with " << ssrc_index << " but we "
<< "have only seen " << ssrcs_received_.size()
<< " ssrcs");
}
}
void MediaPipeline::StateChange(TransportFlow *flow, TransportLayer::State state) {
TransportInfo* info = GetTransportInfo_s(flow);
MOZ_ASSERT(info);
if (state == TransportLayer::TS_OPEN) {
MOZ_MTLOG(ML_INFO, "Flow is ready");
TransportReady_s(*info);
} else if (state == TransportLayer::TS_CLOSED ||
state == TransportLayer::TS_ERROR) {
TransportFailed_s(*info);
}
}
static bool MakeRtpTypeToStringArray(const char** array) {
static const char* RTP_str = "RTP";
static const char* RTCP_str = "RTCP";
static const char* MUX_str = "RTP/RTCP mux";
array[MediaPipeline::RTP] = RTP_str;
array[MediaPipeline::RTCP] = RTCP_str;
array[MediaPipeline::MUX] = MUX_str;
return true;
}
static const char* ToString(MediaPipeline::RtpType type) {
static const char* array[(int)MediaPipeline::MAX_RTP_TYPE] = {nullptr};
// Dummy variable to cause init to happen only on first call
static bool dummy = MakeRtpTypeToStringArray(array);
(void)dummy;
return array[type];
}
nsresult MediaPipeline::TransportReady_s(TransportInfo &info) {
MOZ_ASSERT(!description_.empty());
// TODO(ekr@rtfm.com): implement some kind of notification on
// failure. bug 852665.
if (info.state_ != MP_CONNECTING) {
MOZ_MTLOG(ML_ERROR, "Transport ready for flow in wrong state:" <<
description_ << ": " << ToString(info.type_));
return NS_ERROR_FAILURE;
}
MOZ_MTLOG(ML_INFO, "Transport ready for pipeline " <<
static_cast<void *>(this) << " flow " << description_ << ": " <<
ToString(info.type_));
// TODO(bcampen@mozilla.com): Should we disconnect from the flow on failure?
nsresult res;
// Now instantiate the SRTP objects
TransportLayerDtls *dtls = static_cast<TransportLayerDtls *>(
info.transport_->GetLayer(TransportLayerDtls::ID()));
MOZ_ASSERT(dtls); // DTLS is mandatory
uint16_t cipher_suite;
res = dtls->GetSrtpCipher(&cipher_suite);
if (NS_FAILED(res)) {
MOZ_MTLOG(ML_ERROR, "Failed to negotiate DTLS-SRTP. This is an error");
info.state_ = MP_CLOSED;
UpdateRtcpMuxState(info);
return res;
}
// SRTP Key Exporter as per RFC 5764 S 4.2
unsigned char srtp_block[SRTP_TOTAL_KEY_LENGTH * 2];
res = dtls->ExportKeyingMaterial(kDTLSExporterLabel, false, "",
srtp_block, sizeof(srtp_block));
if (NS_FAILED(res)) {
MOZ_MTLOG(ML_ERROR, "Failed to compute DTLS-SRTP keys. This is an error");
info.state_ = MP_CLOSED;
UpdateRtcpMuxState(info);
MOZ_CRASH(); // TODO: Remove once we have enough field experience to
// know it doesn't happen. bug 798797. Note that the
// code after this never executes.
return res;
}
// Slice and dice as per RFC 5764 S 4.2
unsigned char client_write_key[SRTP_TOTAL_KEY_LENGTH];
unsigned char server_write_key[SRTP_TOTAL_KEY_LENGTH];
int offset = 0;
memcpy(client_write_key, srtp_block + offset, SRTP_MASTER_KEY_LENGTH);
offset += SRTP_MASTER_KEY_LENGTH;
memcpy(server_write_key, srtp_block + offset, SRTP_MASTER_KEY_LENGTH);
offset += SRTP_MASTER_KEY_LENGTH;
memcpy(client_write_key + SRTP_MASTER_KEY_LENGTH,
srtp_block + offset, SRTP_MASTER_SALT_LENGTH);
offset += SRTP_MASTER_SALT_LENGTH;
memcpy(server_write_key + SRTP_MASTER_KEY_LENGTH,
srtp_block + offset, SRTP_MASTER_SALT_LENGTH);
offset += SRTP_MASTER_SALT_LENGTH;
MOZ_ASSERT(offset == sizeof(srtp_block));
unsigned char *write_key;
unsigned char *read_key;
if (dtls->role() == TransportLayerDtls::CLIENT) {
write_key = client_write_key;
read_key = server_write_key;
} else {
write_key = server_write_key;
read_key = client_write_key;
}
MOZ_ASSERT(!info.send_srtp_ && !info.recv_srtp_);
info.send_srtp_ = SrtpFlow::Create(cipher_suite, false, write_key,
SRTP_TOTAL_KEY_LENGTH);
info.recv_srtp_ = SrtpFlow::Create(cipher_suite, true, read_key,
SRTP_TOTAL_KEY_LENGTH);
if (!info.send_srtp_ || !info.recv_srtp_) {
MOZ_MTLOG(ML_ERROR, "Couldn't create SRTP flow for "
<< ToString(info.type_));
info.state_ = MP_CLOSED;
UpdateRtcpMuxState(info);
return NS_ERROR_FAILURE;
}
MOZ_MTLOG(ML_INFO, "Listening for " << ToString(info.type_)
<< " packets received on " <<
static_cast<void *>(dtls->downward()));
switch (info.type_) {
case RTP:
dtls->downward()->SignalPacketReceived.connect(
this,
&MediaPipeline::RtpPacketReceived);
break;
case RTCP:
dtls->downward()->SignalPacketReceived.connect(
this,
&MediaPipeline::RtcpPacketReceived);
break;
case MUX:
dtls->downward()->SignalPacketReceived.connect(
this,
&MediaPipeline::PacketReceived);
break;
default:
MOZ_CRASH();
}
info.state_ = MP_OPEN;
UpdateRtcpMuxState(info);
return NS_OK;
}
nsresult MediaPipeline::TransportFailed_s(TransportInfo &info) {
ASSERT_ON_THREAD(sts_thread_);
info.state_ = MP_CLOSED;
UpdateRtcpMuxState(info);
MOZ_MTLOG(ML_INFO, "Transport closed for flow " << ToString(info.type_));
NS_WARNING(
"MediaPipeline Transport failed. This is not properly cleaned up yet");
// TODO(ekr@rtfm.com): SECURITY: Figure out how to clean up if the
// connection was good and now it is bad.
// TODO(ekr@rtfm.com): Report up so that the PC knows we
// have experienced an error.
return NS_OK;
}
void MediaPipeline::UpdateRtcpMuxState(TransportInfo &info) {
if (info.type_ == MUX) {
if (info.transport_ == rtcp_.transport_) {
rtcp_.state_ = info.state_;
if (!rtcp_.send_srtp_) {
rtcp_.send_srtp_ = info.send_srtp_;
rtcp_.recv_srtp_ = info.recv_srtp_;
}
}
}
}
nsresult MediaPipeline::SendPacket(TransportFlow *flow, const void *data,
int len) {
ASSERT_ON_THREAD(sts_thread_);
// Note that we bypass the DTLS layer here
TransportLayerDtls *dtls = static_cast<TransportLayerDtls *>(
flow->GetLayer(TransportLayerDtls::ID()));
MOZ_ASSERT(dtls);
TransportResult res = dtls->downward()->
SendPacket(static_cast<const unsigned char *>(data), len);
if (res != len) {
// Ignore blocking indications
if (res == TE_WOULDBLOCK)
return NS_OK;
MOZ_MTLOG(ML_ERROR, "Failed write on stream " << description_);
return NS_BASE_STREAM_CLOSED;
}
return NS_OK;
}
void MediaPipeline::increment_rtp_packets_sent(int32_t bytes) {
++rtp_packets_sent_;
rtp_bytes_sent_ += bytes;
if (!(rtp_packets_sent_ % 100)) {
MOZ_MTLOG(ML_INFO, "RTP sent packet count for " << description_
<< " Pipeline " << static_cast<void *>(this)
<< " Flow : " << static_cast<void *>(rtp_.transport_)
<< ": " << rtp_packets_sent_
<< " (" << rtp_bytes_sent_ << " bytes)");
}
}
void MediaPipeline::increment_rtcp_packets_sent() {
++rtcp_packets_sent_;
if (!(rtcp_packets_sent_ % 100)) {
MOZ_MTLOG(ML_INFO, "RTCP sent packet count for " << description_
<< " Pipeline " << static_cast<void *>(this)
<< " Flow : " << static_cast<void *>(rtcp_.transport_)
<< ": " << rtcp_packets_sent_);
}
}
void MediaPipeline::increment_rtp_packets_received(int32_t bytes) {
++rtp_packets_received_;
rtp_bytes_received_ += bytes;
if (!(rtp_packets_received_ % 100)) {
MOZ_MTLOG(ML_INFO, "RTP received packet count for " << description_
<< " Pipeline " << static_cast<void *>(this)
<< " Flow : " << static_cast<void *>(rtp_.transport_)
<< ": " << rtp_packets_received_
<< " (" << rtp_bytes_received_ << " bytes)");
}
}
void MediaPipeline::increment_rtcp_packets_received() {
++rtcp_packets_received_;
if (!(rtcp_packets_received_ % 100)) {
MOZ_MTLOG(ML_INFO, "RTCP received packet count for " << description_
<< " Pipeline " << static_cast<void *>(this)
<< " Flow : " << static_cast<void *>(rtcp_.transport_)
<< ": " << rtcp_packets_received_);
}
}
void MediaPipeline::RtpPacketReceived(TransportLayer *layer,
const unsigned char *data,
size_t len) {
if (!transport_->pipeline()) {
MOZ_MTLOG(ML_ERROR, "Discarding incoming packet; transport disconnected");
return;
}
if (!conduit_) {
MOZ_MTLOG(ML_DEBUG, "Discarding incoming packet; media disconnected");
return;
}
if (rtp_.state_ != MP_OPEN) {
MOZ_MTLOG(ML_ERROR, "Discarding incoming packet; pipeline not open");
return;
}
if (rtp_.transport_->state() != TransportLayer::TS_OPEN) {
MOZ_MTLOG(ML_ERROR, "Discarding incoming packet; transport not open");
return;
}
// This should never happen.
MOZ_ASSERT(rtp_.recv_srtp_);
if (direction_ == TRANSMIT) {
return;
}
if (!len) {
return;
}
// Filter out everything but RTP/RTCP
if (data[0] < 128 || data[0] > 191) {
return;
}
webrtc::RTPHeader header;
if (!rtp_parser_->Parse(data, len, &header)) {
return;
}
if (std::find(ssrcs_received_.begin(), ssrcs_received_.end(), header.ssrc) ==
ssrcs_received_.end()) {
ssrcs_received_.push_back(header.ssrc);
}
if (filter_ && !filter_->Filter(header)) {
return;
}
// Make a copy rather than cast away constness
auto inner_data = MakeUnique<unsigned char[]>(len);
memcpy(inner_data.get(), data, len);
int out_len = 0;
nsresult res = rtp_.recv_srtp_->UnprotectRtp(inner_data.get(),
len, len, &out_len);
if (!NS_SUCCEEDED(res)) {
char tmp[16];
SprintfLiteral(tmp, "%.2x %.2x %.2x %.2x",
inner_data[0],
inner_data[1],
inner_data[2],
inner_data[3]);
MOZ_MTLOG(ML_NOTICE, "Error unprotecting RTP in " << description_
<< "len= " << len << "[" << tmp << "...]");
return;
}
MOZ_MTLOG(ML_DEBUG, description_ << " received RTP packet.");
increment_rtp_packets_received(out_len);
(void)conduit_->ReceivedRTPPacket(inner_data.get(), out_len); // Ignore error codes
}
void MediaPipeline::RtcpPacketReceived(TransportLayer *layer,
const unsigned char *data,
size_t len) {
if (!transport_->pipeline()) {
MOZ_MTLOG(ML_DEBUG, "Discarding incoming packet; transport disconnected");
return;
}
if (!conduit_) {
MOZ_MTLOG(ML_DEBUG, "Discarding incoming packet; media disconnected");
return;
}
if (rtcp_.state_ != MP_OPEN) {
MOZ_MTLOG(ML_DEBUG, "Discarding incoming packet; pipeline not open");
return;
}
if (rtcp_.transport_->state() != TransportLayer::TS_OPEN) {
MOZ_MTLOG(ML_ERROR, "Discarding incoming packet; transport not open");
return;
}
if (!len) {
return;
}
// Filter out everything but RTP/RTCP
if (data[0] < 128 || data[0] > 191) {
return;
}
// We do not filter RTCP for send pipelines, since the webrtc.org code for
// senders already has logic to ignore RRs that do not apply.
// TODO bug 1279153: remove SR check for reduced size RTCP
if (filter_ && direction_ == RECEIVE) {
if (!filter_->FilterSenderReport(data, len)) {
MOZ_MTLOG(ML_NOTICE, "Dropping incoming RTCP packet; filtered out");
return;
}
}
// Make a copy rather than cast away constness
auto inner_data = MakeUnique<unsigned char[]>(len);
memcpy(inner_data.get(), data, len);
int out_len;
nsresult res = rtcp_.recv_srtp_->UnprotectRtcp(inner_data.get(),
len,
len,
&out_len);
if (!NS_SUCCEEDED(res))
return;
MOZ_MTLOG(ML_DEBUG, description_ << " received RTCP packet.");
increment_rtcp_packets_received();
MOZ_ASSERT(rtcp_.recv_srtp_); // This should never happen
(void)conduit_->ReceivedRTCPPacket(inner_data.get(), out_len); // Ignore error codes
}
bool MediaPipeline::IsRtp(const unsigned char *data, size_t len) {
if (len < 2)
return false;
// Check if this is a RTCP packet. Logic based on the types listed in
// media/webrtc/trunk/src/modules/rtp_rtcp/source/rtp_utility.cc
// Anything outside this range is RTP.
if ((data[1] < 192) || (data[1] > 207))
return true;
if (data[1] == 192) // FIR
return false;
if (data[1] == 193) // NACK, but could also be RTP. This makes us sad
return true; // but it's how webrtc.org behaves.
if (data[1] == 194)
return true;
if (data[1] == 195) // IJ.
return false;
if ((data[1] > 195) && (data[1] < 200)) // the > 195 is redundant
return true;
if ((data[1] >= 200) && (data[1] <= 207)) // SR, RR, SDES, BYE,
return false; // APP, RTPFB, PSFB, XR
MOZ_ASSERT(false); // Not reached, belt and suspenders.
return true;
}
void MediaPipeline::PacketReceived(TransportLayer *layer,
const unsigned char *data,
size_t len) {
if (!transport_->pipeline()) {
MOZ_MTLOG(ML_DEBUG, "Discarding incoming packet; transport disconnected");
return;
}
if (IsRtp(data, len)) {
RtpPacketReceived(layer, data, len);
} else {
RtcpPacketReceived(layer, data, len);
}
}
class MediaPipelineTransmit::PipelineListener
: public DirectMediaStreamTrackListener
{
friend class MediaPipelineTransmit;
public:
explicit PipelineListener(const RefPtr<MediaSessionConduit>& conduit)
: conduit_(conduit),
track_id_(TRACK_INVALID),
mMutex("MediaPipelineTransmit::PipelineListener"),
track_id_external_(TRACK_INVALID),
active_(false),
enabled_(false),
direct_connect_(false)
{
}
~PipelineListener()
{
if (!NS_IsMainThread()) {
// release conduit on mainthread. Must use forget()!
nsresult rv = NS_DispatchToMainThread(new
ConduitDeleteEvent(conduit_.forget()));
MOZ_ASSERT(!NS_FAILED(rv),"Could not dispatch conduit shutdown to main");
if (NS_FAILED(rv)) {
MOZ_CRASH();
}
} else {
conduit_ = nullptr;
}
#if !defined(MOZILLA_EXTERNAL_LINKAGE)
if (converter_) {
converter_->Shutdown();
}
#endif
}
// Dispatches setting the internal TrackID to TRACK_INVALID to the media
// graph thread to keep it in sync with other MediaStreamGraph operations
// like RemoveListener() and AddListener(). The TrackID will be updated on
// the next NewData() callback.
void UnsetTrackId(MediaStreamGraphImpl* graph);
void SetActive(bool active) { active_ = active; }
void SetEnabled(bool enabled) { enabled_ = enabled; }
// These are needed since nested classes don't have access to any particular
// instance of the parent
void SetAudioProxy(const RefPtr<AudioProxyThread>& proxy)
{
audio_processing_ = proxy;
}
#if !defined(MOZILLA_EXTERNAL_LINKAGE)
void SetVideoFrameConverter(const RefPtr<VideoFrameConverter>& converter)
{
converter_ = converter;
}
void OnVideoFrameConverted(unsigned char* aVideoFrame,
unsigned int aVideoFrameLength,
unsigned short aWidth,
unsigned short aHeight,
VideoType aVideoType,
uint64_t aCaptureTime)
{
MOZ_ASSERT(conduit_->type() == MediaSessionConduit::VIDEO);
static_cast<VideoSessionConduit*>(conduit_.get())->SendVideoFrame(
aVideoFrame, aVideoFrameLength, aWidth, aHeight, aVideoType, aCaptureTime);
}
void OnVideoFrameConverted(webrtc::I420VideoFrame& aVideoFrame)
{
MOZ_ASSERT(conduit_->type() == MediaSessionConduit::VIDEO);
static_cast<VideoSessionConduit*>(conduit_.get())->SendVideoFrame(aVideoFrame);
}
#endif
// Implement MediaStreamTrackListener
void NotifyQueuedChanges(MediaStreamGraph* aGraph,
StreamTime aTrackOffset,
const MediaSegment& aQueuedMedia) override;
// Implement DirectMediaStreamTrackListener
void NotifyRealtimeTrackData(MediaStreamGraph* aGraph,
StreamTime aTrackOffset,
const MediaSegment& aMedia) override;
void NotifyDirectListenerInstalled(InstallationResult aResult) override;
void NotifyDirectListenerUninstalled() override;
private:
void UnsetTrackIdImpl() {
MutexAutoLock lock(mMutex);
track_id_ = track_id_external_ = TRACK_INVALID;
}
void NewData(MediaStreamGraph* graph,
StreamTime offset,
const MediaSegment& media);
RefPtr<MediaSessionConduit> conduit_;
RefPtr<AudioProxyThread> audio_processing_;
#if !defined(MOZILLA_EXTERNAL_LINKAGE)
RefPtr<VideoFrameConverter> converter_;
#endif
// May be TRACK_INVALID until we see data from the track
TrackID track_id_; // this is the current TrackID this listener is attached to
Mutex mMutex;
// protected by mMutex
// May be TRACK_INVALID until we see data from the track
TrackID track_id_external_; // this is queried from other threads
// active is true if there is a transport to send on
mozilla::Atomic<bool> active_;
// enabled is true if the media access control permits sending
// actual content; when false you get black/silence
mozilla::Atomic<bool> enabled_;
// Written and read on the MediaStreamGraph thread
bool direct_connect_;
};
#if !defined(MOZILLA_EXTERNAL_LINKAGE)
// Implements VideoConverterListener for MediaPipeline.
//
// We pass converted frames on to MediaPipelineTransmit::PipelineListener
// where they are further forwarded to VideoConduit.
// MediaPipelineTransmit calls Detach() during shutdown to ensure there is
// no cyclic dependencies between us and PipelineListener.
class MediaPipelineTransmit::VideoFrameFeeder
: public VideoConverterListener
{
public:
explicit VideoFrameFeeder(const RefPtr<PipelineListener>& listener)
: listener_(listener),
mutex_("VideoFrameFeeder")
{
MOZ_COUNT_CTOR(VideoFrameFeeder);
}
void Detach()
{
MutexAutoLock lock(mutex_);
listener_ = nullptr;
}
void OnVideoFrameConverted(unsigned char* aVideoFrame,
unsigned int aVideoFrameLength,
unsigned short aWidth,
unsigned short aHeight,
VideoType aVideoType,
uint64_t aCaptureTime) override
{
MutexAutoLock lock(mutex_);
if (!listener_) {
return;
}
listener_->OnVideoFrameConverted(aVideoFrame, aVideoFrameLength,
aWidth, aHeight, aVideoType, aCaptureTime);
}
void OnVideoFrameConverted(webrtc::I420VideoFrame& aVideoFrame) override
{
MutexAutoLock lock(mutex_);
if (!listener_) {
return;
}
listener_->OnVideoFrameConverted(aVideoFrame);
}
protected:
virtual ~VideoFrameFeeder()
{
MOZ_COUNT_DTOR(VideoFrameFeeder);
}
RefPtr<PipelineListener> listener_;
Mutex mutex_;
};
#endif
class MediaPipelineTransmit::PipelineVideoSink :
public MediaStreamVideoSink
{
public:
explicit PipelineVideoSink(const RefPtr<MediaSessionConduit>& conduit,
MediaPipelineTransmit::PipelineListener* listener)
: conduit_(conduit)
, pipelineListener_(listener)
{
}
virtual void SetCurrentFrames(const VideoSegment& aSegment) override;
virtual void ClearFrames() override {}
private:
~PipelineVideoSink() {
// release conduit on mainthread. Must use forget()!
nsresult rv = NS_DispatchToMainThread(new
ConduitDeleteEvent(conduit_.forget()));
MOZ_ASSERT(!NS_FAILED(rv),"Could not dispatch conduit shutdown to main");
if (NS_FAILED(rv)) {
MOZ_CRASH();
}
}
RefPtr<MediaSessionConduit> conduit_;
MediaPipelineTransmit::PipelineListener* pipelineListener_;
};
MediaPipelineTransmit::MediaPipelineTransmit(
const std::string& pc,
nsCOMPtr<nsIEventTarget> main_thread,
nsCOMPtr<nsIEventTarget> sts_thread,
dom::MediaStreamTrack* domtrack,
const std::string& track_id,
int level,
RefPtr<MediaSessionConduit> conduit,
RefPtr<TransportFlow> rtp_transport,
RefPtr<TransportFlow> rtcp_transport,
nsAutoPtr<MediaPipelineFilter> filter) :
MediaPipeline(pc, TRANSMIT, main_thread, sts_thread, track_id, level,
conduit, rtp_transport, rtcp_transport, filter),
listener_(new PipelineListener(conduit)),
video_sink_(new PipelineVideoSink(conduit, listener_)),
domtrack_(domtrack)
{
if (!IsVideo()) {
audio_processing_ = MakeAndAddRef<AudioProxyThread>(static_cast<AudioSessionConduit*>(conduit.get()));
listener_->SetAudioProxy(audio_processing_);
}
#if !defined(MOZILLA_EXTERNAL_LINKAGE)
else { // Video
// For video we send frames to an async VideoFrameConverter that calls
// back to a VideoFrameFeeder that feeds I420 frames to VideoConduit.
feeder_ = MakeAndAddRef<VideoFrameFeeder>(listener_);
converter_ = MakeAndAddRef<VideoFrameConverter>();
converter_->AddListener(feeder_);
listener_->SetVideoFrameConverter(converter_);
}
#endif
}
MediaPipelineTransmit::~MediaPipelineTransmit()
{
#if !defined(MOZILLA_EXTERNAL_LINKAGE)
if (feeder_) {
feeder_->Detach();
}
#endif
}
nsresult MediaPipelineTransmit::Init() {
AttachToTrack(track_id_);
return MediaPipeline::Init();
}
void MediaPipelineTransmit::AttachToTrack(const std::string& track_id) {
ASSERT_ON_THREAD(main_thread_);
description_ = pc_ + "| ";
description_ += conduit_->type() == MediaSessionConduit::AUDIO ?
"Transmit audio[" : "Transmit video[";
description_ += track_id;
description_ += "]";
// TODO(ekr@rtfm.com): Check for errors
MOZ_MTLOG(ML_DEBUG, "Attaching pipeline to track "
<< static_cast<void *>(domtrack_) << " conduit type=" <<
(conduit_->type() == MediaSessionConduit::AUDIO ?"audio":"video"));
// Register the Listener directly with the source if we can.
// We also register it as a non-direct listener so we fall back to that
// if installing the direct listener fails. As a direct listener we get access
// to direct unqueued (and not resampled) data.
domtrack_->AddDirectListener(listener_);
domtrack_->AddListener(listener_);
#if !defined(MOZILLA_EXTERNAL_LINKAGE)
domtrack_->AddDirectListener(video_sink_);
#endif
#ifndef MOZILLA_INTERNAL_API
// this enables the unit tests that can't fiddle with principals and the like
listener_->SetEnabled(true);
#endif
}
bool
MediaPipelineTransmit::IsVideo() const
{
return !!domtrack_->AsVideoStreamTrack();
}
#if !defined(MOZILLA_EXTERNAL_LINKAGE)
void MediaPipelineTransmit::UpdateSinkIdentity_m(MediaStreamTrack* track,
nsIPrincipal* principal,
const PeerIdentity* sinkIdentity) {
ASSERT_ON_THREAD(main_thread_);
if (track != nullptr && track != domtrack_) {
// If a track is specified, then it might not be for this pipeline,
// since we receive notifications for all tracks on the PC.
// nullptr means that the PeerIdentity has changed and shall be applied
// to all tracks of the PC.
return;
}
bool enableTrack = principal->Subsumes(domtrack_->GetPrincipal());
if (!enableTrack) {
// first try didn't work, but there's a chance that this is still available
// if our track is bound to a peerIdentity, and the peer connection (our
// sink) is bound to the same identity, then we can enable the track.
const PeerIdentity* trackIdentity = domtrack_->GetPeerIdentity();
if (sinkIdentity && trackIdentity) {
enableTrack = (*sinkIdentity == *trackIdentity);
}
}
listener_->SetEnabled(enableTrack);
}
#endif
void
MediaPipelineTransmit::DetachMedia()
{
ASSERT_ON_THREAD(main_thread_);
if (domtrack_) {
domtrack_->RemoveDirectListener(listener_);
domtrack_->RemoveListener(listener_);
domtrack_->RemoveDirectListener(video_sink_);
domtrack_ = nullptr;
}
// Let the listener be destroyed with the pipeline (or later).
}
nsresult MediaPipelineTransmit::TransportReady_s(TransportInfo &info) {
ASSERT_ON_THREAD(sts_thread_);
// Call base ready function.
MediaPipeline::TransportReady_s(info);
// Should not be set for a transmitter
if (&info == &rtp_) {
listener_->SetActive(true);
}
return NS_OK;
}
nsresult MediaPipelineTransmit::ReplaceTrack(MediaStreamTrack& domtrack) {
// MainThread, checked in calls we make
#if !defined(MOZILLA_EXTERNAL_LINKAGE)
nsString nsTrackId;
domtrack.GetId(nsTrackId);
std::string track_id(NS_ConvertUTF16toUTF8(nsTrackId).get());
#else
std::string track_id = domtrack.GetId();
#endif
MOZ_MTLOG(ML_DEBUG, "Reattaching pipeline " << description_ << " to track "
<< static_cast<void *>(&domtrack)
<< " track " << track_id << " conduit type=" <<
(conduit_->type() == MediaSessionConduit::AUDIO ?"audio":"video"));
DetachMedia();
domtrack_ = &domtrack; // Detach clears it
// Unsets the track id after RemoveListener() takes effect.
listener_->UnsetTrackId(domtrack_->GraphImpl());
track_id_ = track_id;
AttachToTrack(track_id);
return NS_OK;
}
void MediaPipeline::DisconnectTransport_s(TransportInfo &info) {
MOZ_ASSERT(info.transport_);
ASSERT_ON_THREAD(sts_thread_);
info.transport_->SignalStateChange.disconnect(this);
// We do this even if we're a transmitter, since we are still possibly
// registered to receive RTCP.
TransportLayerDtls *dtls = static_cast<TransportLayerDtls *>(
info.transport_->GetLayer(TransportLayerDtls::ID()));
MOZ_ASSERT(dtls); // DTLS is mandatory
MOZ_ASSERT(dtls->downward());
dtls->downward()->SignalPacketReceived.disconnect(this);
}
nsresult MediaPipeline::ConnectTransport_s(TransportInfo &info) {
MOZ_ASSERT(info.transport_);
ASSERT_ON_THREAD(sts_thread_);
// Look to see if the transport is ready
if (info.transport_->state() == TransportLayer::TS_OPEN) {
nsresult res = TransportReady_s(info);
if (NS_FAILED(res)) {
MOZ_MTLOG(ML_ERROR, "Error calling TransportReady(); res="
<< static_cast<uint32_t>(res) << " in " << __FUNCTION__);
return res;
}
} else if (info.transport_->state() == TransportLayer::TS_ERROR) {
MOZ_MTLOG(ML_ERROR, ToString(info.type_)
<< "transport is already in error state");
TransportFailed_s(info);
return NS_ERROR_FAILURE;
}
info.transport_->SignalStateChange.connect(this,
&MediaPipeline::StateChange);
return NS_OK;
}
MediaPipeline::TransportInfo* MediaPipeline::GetTransportInfo_s(
TransportFlow *flow) {
ASSERT_ON_THREAD(sts_thread_);
if (flow == rtp_.transport_) {
return &rtp_;
}
if (flow == rtcp_.transport_) {
return &rtcp_;
}
return nullptr;
}
nsresult MediaPipeline::PipelineTransport::SendRtpPacket(
const void *data, int len) {
nsAutoPtr<DataBuffer> buf(new DataBuffer(static_cast<const uint8_t *>(data),
len, len + SRTP_MAX_EXPANSION));
RUN_ON_THREAD(sts_thread_,
WrapRunnable(
RefPtr<MediaPipeline::PipelineTransport>(this),
&MediaPipeline::PipelineTransport::SendRtpRtcpPacket_s,
buf, true),
NS_DISPATCH_NORMAL);
return NS_OK;
}
nsresult MediaPipeline::PipelineTransport::SendRtpRtcpPacket_s(
nsAutoPtr<DataBuffer> data,
bool is_rtp) {
ASSERT_ON_THREAD(sts_thread_);
if (!pipeline_) {
return NS_OK; // Detached
}
TransportInfo& transport = is_rtp ? pipeline_->rtp_ : pipeline_->rtcp_;
if (!transport.send_srtp_) {
MOZ_MTLOG(ML_DEBUG, "Couldn't write RTP/RTCP packet; SRTP not set up yet");
return NS_OK;
}
MOZ_ASSERT(transport.transport_);
NS_ENSURE_TRUE(transport.transport_, NS_ERROR_NULL_POINTER);
// libsrtp enciphers in place, so we need a big enough buffer.
MOZ_ASSERT(data->capacity() >= data->len() + SRTP_MAX_EXPANSION);
int out_len;
nsresult res;
if (is_rtp) {
res = transport.send_srtp_->ProtectRtp(data->data(),
data->len(),
data->capacity(),
&out_len);
} else {
res = transport.send_srtp_->ProtectRtcp(data->data(),
data->len(),
data->capacity(),
&out_len);
}
if (!NS_SUCCEEDED(res)) {
return res;
}
// paranoia; don't have uninitialized bytes included in data->len()
data->SetLength(out_len);
MOZ_MTLOG(ML_DEBUG, pipeline_->description_ << " sending " <<
(is_rtp ? "RTP" : "RTCP") << " packet");
if (is_rtp) {
pipeline_->increment_rtp_packets_sent(out_len);
} else {
pipeline_->increment_rtcp_packets_sent();
}
return pipeline_->SendPacket(transport.transport_, data->data(), out_len);
}
nsresult MediaPipeline::PipelineTransport::SendRtcpPacket(
const void *data, int len) {
nsAutoPtr<DataBuffer> buf(new DataBuffer(static_cast<const uint8_t *>(data),
len, len + SRTP_MAX_EXPANSION));
RUN_ON_THREAD(sts_thread_,
WrapRunnable(
RefPtr<MediaPipeline::PipelineTransport>(this),
&MediaPipeline::PipelineTransport::SendRtpRtcpPacket_s,
buf, false),
NS_DISPATCH_NORMAL);
return NS_OK;
}
void MediaPipelineTransmit::PipelineListener::
UnsetTrackId(MediaStreamGraphImpl* graph) {
#ifndef USE_FAKE_MEDIA_STREAMS
class Message : public ControlMessage {
public:
explicit Message(PipelineListener* listener) :
ControlMessage(nullptr), listener_(listener) {}
virtual void Run() override
{
listener_->UnsetTrackIdImpl();
}
RefPtr<PipelineListener> listener_;
};
graph->AppendMessage(MakeUnique<Message>(this));
#else
UnsetTrackIdImpl();
#endif
}
// Called if we're attached with AddDirectListener()
void MediaPipelineTransmit::PipelineListener::
NotifyRealtimeTrackData(MediaStreamGraph* graph,
StreamTime offset,
const MediaSegment& media) {
MOZ_MTLOG(ML_DEBUG, "MediaPipeline::NotifyRealtimeTrackData() listener=" <<
this << ", offset=" << offset <<
", duration=" << media.GetDuration());
NewData(graph, offset, media);
}
void MediaPipelineTransmit::PipelineListener::
NotifyQueuedChanges(MediaStreamGraph* graph,
StreamTime offset,
const MediaSegment& queued_media) {
MOZ_MTLOG(ML_DEBUG, "MediaPipeline::NotifyQueuedChanges()");
// ignore non-direct data if we're also getting direct data
if (!direct_connect_) {
NewData(graph, offset, queued_media);
}
}
void MediaPipelineTransmit::PipelineListener::
NotifyDirectListenerInstalled(InstallationResult aResult) {
MOZ_MTLOG(ML_INFO, "MediaPipeline::NotifyDirectListenerInstalled() listener= " <<
this << ", result=" << static_cast<int32_t>(aResult));
direct_connect_ = InstallationResult::SUCCESS == aResult;
}
void MediaPipelineTransmit::PipelineListener::
NotifyDirectListenerUninstalled() {
MOZ_MTLOG(ML_INFO, "MediaPipeline::NotifyDirectListenerUninstalled() listener=" << this);
direct_connect_ = false;
}
void MediaPipelineTransmit::PipelineListener::
NewData(MediaStreamGraph* graph,
StreamTime offset,
const MediaSegment& media) {
if (!active_) {
MOZ_MTLOG(ML_DEBUG, "Discarding packets because transport not ready");
return;
}
if (conduit_->type() !=
(media.GetType() == MediaSegment::AUDIO ? MediaSessionConduit::AUDIO :
MediaSessionConduit::VIDEO)) {
MOZ_ASSERT(false, "The media type should always be correct since the "
"listener is locked to a specific track");
return;
}
// TODO(ekr@rtfm.com): For now assume that we have only one
// track type and it's destined for us
// See bug 784517
if (media.GetType() == MediaSegment::AUDIO) {
AudioSegment* audio = const_cast<AudioSegment *>(
static_cast<const AudioSegment *>(&media));
AudioSegment::ChunkIterator iter(*audio);
while(!iter.IsEnded()) {
TrackRate rate;
#ifdef USE_FAKE_MEDIA_STREAMS
rate = Fake_MediaStream::GraphRate();
#else
rate = graph->GraphRate();
#endif
audio_processing_->QueueAudioChunk(rate, *iter, enabled_);
iter.Next();
}
} else {
// Ignore
}
}
void MediaPipelineTransmit::PipelineVideoSink::
SetCurrentFrames(const VideoSegment& aSegment)
{
MOZ_ASSERT(pipelineListener_);
if (!pipelineListener_->active_) {
MOZ_MTLOG(ML_DEBUG, "Discarding packets because transport not ready");
return;
}
if (conduit_->type() != MediaSessionConduit::VIDEO) {
// Ignore data of wrong kind in case we have a muxed stream
return;
}
#if !defined(MOZILLA_EXTERNAL_LINKAGE)
VideoSegment* video = const_cast<VideoSegment *>(&aSegment);
VideoSegment::ChunkIterator iter(*video);
while(!iter.IsEnded()) {
pipelineListener_->converter_->QueueVideoChunk(*iter, !pipelineListener_->enabled_);
iter.Next();
}
#endif
}
class TrackAddedCallback {
public:
virtual void TrackAdded(TrackTicks current_ticks) = 0;
NS_INLINE_DECL_THREADSAFE_REFCOUNTING(TrackAddedCallback);
protected:
virtual ~TrackAddedCallback() {}
};
class GenericReceiveListener;
class GenericReceiveCallback : public TrackAddedCallback
{
public:
explicit GenericReceiveCallback(GenericReceiveListener* listener)
: listener_(listener) {}
void TrackAdded(TrackTicks time);
private:
RefPtr<GenericReceiveListener> listener_;
};
// Add a listener on the MSG thread using the MSG command queue
static void AddListener(MediaStream* source, MediaStreamListener* listener) {
#if !defined(MOZILLA_EXTERNAL_LINKAGE)
class Message : public ControlMessage {
public:
Message(MediaStream* stream, MediaStreamListener* listener)
: ControlMessage(stream),
listener_(listener) {}
virtual void Run() override {
mStream->AddListenerImpl(listener_.forget());
}
private:
RefPtr<MediaStreamListener> listener_;
};
MOZ_ASSERT(listener);
source->GraphImpl()->AppendMessage(MakeUnique<Message>(source, listener));
#else
source->AddListener(listener);
#endif
}
class GenericReceiveListener : public MediaStreamListener
{
public:
GenericReceiveListener(SourceMediaStream *source, TrackID track_id)
: source_(source),
track_id_(track_id),
played_ticks_(0),
principal_handle_(PRINCIPAL_HANDLE_NONE) {}
virtual ~GenericReceiveListener() {}
void AddSelf()
{
AddListener(source_, this);
}
void EndTrack()
{
source_->EndTrack(track_id_);
}
#ifndef USE_FAKE_MEDIA_STREAMS
// Must be called on the main thread
void SetPrincipalHandle_m(const PrincipalHandle& principal_handle)
{
class Message : public ControlMessage
{
public:
Message(GenericReceiveListener* listener,
MediaStream* stream,
const PrincipalHandle& principal_handle)
: ControlMessage(stream),
listener_(listener),
principal_handle_(principal_handle)
{}
void Run() override {
listener_->SetPrincipalHandle_msg(principal_handle_);
}
RefPtr<GenericReceiveListener> listener_;
PrincipalHandle principal_handle_;
};
source_->GraphImpl()->AppendMessage(MakeUnique<Message>(this, source_, principal_handle));
}
// Must be called on the MediaStreamGraph thread
void SetPrincipalHandle_msg(const PrincipalHandle& principal_handle)
{
principal_handle_ = principal_handle;
}
#endif // USE_FAKE_MEDIA_STREAMS
protected:
SourceMediaStream *source_;
const TrackID track_id_;
TrackTicks played_ticks_;
PrincipalHandle principal_handle_;
};
MediaPipelineReceive::MediaPipelineReceive(
const std::string& pc,
nsCOMPtr<nsIEventTarget> main_thread,
nsCOMPtr<nsIEventTarget> sts_thread,
SourceMediaStream *stream,
const std::string& track_id,
int level,
RefPtr<MediaSessionConduit> conduit,
RefPtr<TransportFlow> rtp_transport,
RefPtr<TransportFlow> rtcp_transport,
nsAutoPtr<MediaPipelineFilter> filter) :
MediaPipeline(pc, RECEIVE, main_thread, sts_thread,
track_id, level, conduit, rtp_transport,
rtcp_transport, filter),
stream_(stream),
segments_added_(0)
{
MOZ_ASSERT(stream_);
}
MediaPipelineReceive::~MediaPipelineReceive()
{
MOZ_ASSERT(!stream_); // Check that we have shut down already.
}
class MediaPipelineReceiveAudio::PipelineListener
: public GenericReceiveListener
{
public:
PipelineListener(SourceMediaStream * source, TrackID track_id,
const RefPtr<MediaSessionConduit>& conduit)
: GenericReceiveListener(source, track_id),
conduit_(conduit)
{
}
~PipelineListener()
{
if (!NS_IsMainThread()) {
// release conduit on mainthread. Must use forget()!
nsresult rv = NS_DispatchToMainThread(new
ConduitDeleteEvent(conduit_.forget()));
MOZ_ASSERT(!NS_FAILED(rv),"Could not dispatch conduit shutdown to main");
if (NS_FAILED(rv)) {
MOZ_CRASH();
}
} else {
conduit_ = nullptr;
}
}
// Implement MediaStreamListener
void NotifyPull(MediaStreamGraph* graph, StreamTime desired_time) override
{
MOZ_ASSERT(source_);
if (!source_) {
MOZ_MTLOG(ML_ERROR, "NotifyPull() called from a non-SourceMediaStream");
return;
}
// This comparison is done in total time to avoid accumulated roundoff errors.
while (source_->TicksToTimeRoundDown(WEBRTC_DEFAULT_SAMPLE_RATE,
played_ticks_) < desired_time) {
int16_t scratch_buffer[AUDIO_SAMPLE_BUFFER_MAX];
int samples_length;
// This fetches 10ms of data, either mono or stereo
MediaConduitErrorCode err =
static_cast<AudioSessionConduit*>(conduit_.get())->GetAudioFrame(
scratch_buffer,
WEBRTC_DEFAULT_SAMPLE_RATE,
0, // TODO(ekr@rtfm.com): better estimate of "capture" (really playout) delay
samples_length);
if (err != kMediaConduitNoError) {
// Insert silence on conduit/GIPS failure (extremely unlikely)
MOZ_MTLOG(ML_ERROR, "Audio conduit failed (" << err
<< ") to return data @ " << played_ticks_
<< " (desired " << desired_time << " -> "
<< source_->StreamTimeToSeconds(desired_time) << ")");
// if this is not enough we'll loop and provide more
samples_length = WEBRTC_DEFAULT_SAMPLE_RATE/100;
PodArrayZero(scratch_buffer);
}
MOZ_ASSERT(samples_length * sizeof(uint16_t) < AUDIO_SAMPLE_BUFFER_MAX);
MOZ_MTLOG(ML_DEBUG, "Audio conduit returned buffer of length "
<< samples_length);
RefPtr<SharedBuffer> samples = SharedBuffer::Create(samples_length * sizeof(uint16_t));
int16_t *samples_data = static_cast<int16_t *>(samples->Data());
AudioSegment segment;
// We derive the number of channels of the stream from the number of samples
// the AudioConduit gives us, considering it gives us packets of 10ms and we
// know the rate.
uint32_t channelCount = samples_length / (WEBRTC_DEFAULT_SAMPLE_RATE / 100);
AutoTArray<int16_t*,2> channels;
AutoTArray<const int16_t*,2> outputChannels;
size_t frames = samples_length / channelCount;
channels.SetLength(channelCount);
size_t offset = 0;
for (size_t i = 0; i < channelCount; i++) {
channels[i] = samples_data + offset;
offset += frames;
}
DeinterleaveAndConvertBuffer(scratch_buffer,
frames,
channelCount,
channels.Elements());
outputChannels.AppendElements(channels);
segment.AppendFrames(samples.forget(), outputChannels, frames,
principal_handle_);
// Handle track not actually added yet or removed/finished
if (source_->AppendToTrack(track_id_, &segment)) {
played_ticks_ += frames;
} else {
MOZ_MTLOG(ML_ERROR, "AppendToTrack failed");
// we can't un-read the data, but that's ok since we don't want to
// buffer - but don't i-loop!
return;
}
}
}
private:
RefPtr<MediaSessionConduit> conduit_;
};
MediaPipelineReceiveAudio::MediaPipelineReceiveAudio(
const std::string& pc,
nsCOMPtr<nsIEventTarget> main_thread,
nsCOMPtr<nsIEventTarget> sts_thread,
SourceMediaStream* stream,
const std::string& media_stream_track_id,
TrackID numeric_track_id,
int level,
RefPtr<AudioSessionConduit> conduit,
RefPtr<TransportFlow> rtp_transport,
RefPtr<TransportFlow> rtcp_transport,
nsAutoPtr<MediaPipelineFilter> filter) :
MediaPipelineReceive(pc, main_thread, sts_thread,
stream, media_stream_track_id, level, conduit,
rtp_transport, rtcp_transport, filter),
listener_(new PipelineListener(stream, numeric_track_id, conduit))
{}
void MediaPipelineReceiveAudio::DetachMedia()
{
ASSERT_ON_THREAD(main_thread_);
if (stream_ && listener_) {
listener_->EndTrack();
stream_->RemoveListener(listener_);
stream_ = nullptr;
}
}
nsresult MediaPipelineReceiveAudio::Init() {
ASSERT_ON_THREAD(main_thread_);
MOZ_MTLOG(ML_DEBUG, __FUNCTION__);
description_ = pc_ + "| Receive audio[";
description_ += track_id_;
description_ += "]";
listener_->AddSelf();
return MediaPipelineReceive::Init();
}
#ifndef USE_FAKE_MEDIA_STREAMS
void MediaPipelineReceiveAudio::SetPrincipalHandle_m(const PrincipalHandle& principal_handle)
{
listener_->SetPrincipalHandle_m(principal_handle);
}
#endif // USE_FAKE_MEDIA_STREAMS
class MediaPipelineReceiveVideo::PipelineListener
: public GenericReceiveListener {
public:
PipelineListener(SourceMediaStream * source, TrackID track_id)
: GenericReceiveListener(source, track_id),
width_(0),
height_(0),
#if defined(MOZILLA_INTERNAL_API)
image_container_(),
image_(),
#endif
monitor_("Video PipelineListener")
{
#if !defined(MOZILLA_EXTERNAL_LINKAGE)
image_container_ =
LayerManager::CreateImageContainer(ImageContainer::ASYNCHRONOUS);
#endif
}
// Implement MediaStreamListener
void NotifyPull(MediaStreamGraph* graph, StreamTime desired_time) override
{
#if defined(MOZILLA_INTERNAL_API)
ReentrantMonitorAutoEnter enter(monitor_);
RefPtr<Image> image = image_;
StreamTime delta = desired_time - played_ticks_;
// Don't append if we've already provided a frame that supposedly
// goes past the current aDesiredTime Doing so means a negative
// delta and thus messes up handling of the graph
if (delta > 0) {
VideoSegment segment;
segment.AppendFrame(image.forget(), delta, IntSize(width_, height_),
principal_handle_);
// Handle track not actually added yet or removed/finished
if (source_->AppendToTrack(track_id_, &segment)) {
played_ticks_ = desired_time;
} else {
MOZ_MTLOG(ML_ERROR, "AppendToTrack failed");
return;
}
}
#endif
}
// Accessors for external writes from the renderer
void FrameSizeChange(unsigned int width,
unsigned int height,
unsigned int number_of_streams) {
ReentrantMonitorAutoEnter enter(monitor_);
width_ = width;
height_ = height;
}
void RenderVideoFrame(const unsigned char* buffer,
size_t buffer_size,
uint32_t time_stamp,
int64_t render_time,
const RefPtr<layers::Image>& video_image)
{
RenderVideoFrame(buffer, buffer_size, width_, (width_ + 1) >> 1,
time_stamp, render_time, video_image);
}
void RenderVideoFrame(const unsigned char* buffer,
size_t buffer_size,
uint32_t y_stride,
uint32_t cbcr_stride,
uint32_t time_stamp,
int64_t render_time,
const RefPtr<layers::Image>& video_image)
{
#ifdef MOZILLA_INTERNAL_API
ReentrantMonitorAutoEnter enter(monitor_);
#endif // MOZILLA_INTERNAL_API
#if defined(MOZILLA_INTERNAL_API)
if (buffer) {
// Create a video frame using |buffer|.
RefPtr<PlanarYCbCrImage> yuvImage = image_container_->CreatePlanarYCbCrImage();
uint8_t* frame = const_cast<uint8_t*>(static_cast<const uint8_t*> (buffer));
PlanarYCbCrData yuvData;
yuvData.mYChannel = frame;
yuvData.mYSize = IntSize(y_stride, height_);
yuvData.mYStride = y_stride;
yuvData.mCbCrStride = cbcr_stride;
yuvData.mCbChannel = frame + height_ * yuvData.mYStride;
yuvData.mCrChannel = yuvData.mCbChannel + ((height_ + 1) >> 1) * yuvData.mCbCrStride;
yuvData.mCbCrSize = IntSize(yuvData.mCbCrStride, (height_ + 1) >> 1);
yuvData.mPicX = 0;
yuvData.mPicY = 0;
yuvData.mPicSize = IntSize(width_, height_);
yuvData.mStereoMode = StereoMode::MONO;
if (!yuvImage->CopyData(yuvData)) {
MOZ_ASSERT(false);
return;
}
image_ = yuvImage;
}
#ifdef WEBRTC_GONK
else {
// Decoder produced video frame that can be appended to the track directly.
MOZ_ASSERT(video_image);
image_ = video_image;
}
#endif // WEBRTC_GONK
#endif // MOZILLA_INTERNAL_API
}
private:
int width_;
int height_;
#if defined(MOZILLA_INTERNAL_API)
RefPtr<layers::ImageContainer> image_container_;
RefPtr<layers::Image> image_;
#endif
mozilla::ReentrantMonitor monitor_; // Monitor for processing WebRTC frames.
// Protects image_ against:
// - Writing from the GIPS thread
// - Reading from the MSG thread
};
class MediaPipelineReceiveVideo::PipelineRenderer : public VideoRenderer
{
public:
explicit PipelineRenderer(MediaPipelineReceiveVideo *pipeline) :
pipeline_(pipeline) {}
void Detach() { pipeline_ = nullptr; }
// Implement VideoRenderer
void FrameSizeChange(unsigned int width,
unsigned int height,
unsigned int number_of_streams) override
{
pipeline_->listener_->FrameSizeChange(width, height, number_of_streams);
}
void RenderVideoFrame(const unsigned char* buffer,
size_t buffer_size,
uint32_t time_stamp,
int64_t render_time,
const ImageHandle& handle) override
{
pipeline_->listener_->RenderVideoFrame(buffer, buffer_size,
time_stamp, render_time,
handle.GetImage());
}
void RenderVideoFrame(const unsigned char* buffer,
size_t buffer_size,
uint32_t y_stride,
uint32_t cbcr_stride,
uint32_t time_stamp,
int64_t render_time,
const ImageHandle& handle) override
{
pipeline_->listener_->RenderVideoFrame(buffer, buffer_size,
y_stride, cbcr_stride,
time_stamp, render_time,
handle.GetImage());
}
private:
MediaPipelineReceiveVideo *pipeline_; // Raw pointer to avoid cycles
};
MediaPipelineReceiveVideo::MediaPipelineReceiveVideo(
const std::string& pc,
nsCOMPtr<nsIEventTarget> main_thread,
nsCOMPtr<nsIEventTarget> sts_thread,
SourceMediaStream *stream,
const std::string& media_stream_track_id,
TrackID numeric_track_id,
int level,
RefPtr<VideoSessionConduit> conduit,
RefPtr<TransportFlow> rtp_transport,
RefPtr<TransportFlow> rtcp_transport,
nsAutoPtr<MediaPipelineFilter> filter) :
MediaPipelineReceive(pc, main_thread, sts_thread,
stream, media_stream_track_id, level, conduit,
rtp_transport, rtcp_transport, filter),
renderer_(new PipelineRenderer(this)),
listener_(new PipelineListener(stream, numeric_track_id))
{}
void MediaPipelineReceiveVideo::DetachMedia()
{
ASSERT_ON_THREAD(main_thread_);
// stop generating video and thus stop invoking the PipelineRenderer
// and PipelineListener - the renderer has a raw ptr to the Pipeline to
// avoid cycles, and the render callbacks are invoked from a different
// thread so simple null-checks would cause TSAN bugs without locks.
static_cast<VideoSessionConduit*>(conduit_.get())->DetachRenderer();
if (stream_ && listener_) {
listener_->EndTrack();
stream_->RemoveListener(listener_);
stream_ = nullptr;
}
}
nsresult MediaPipelineReceiveVideo::Init() {
ASSERT_ON_THREAD(main_thread_);
MOZ_MTLOG(ML_DEBUG, __FUNCTION__);
description_ = pc_ + "| Receive video[";
description_ += track_id_;
description_ += "]";
#if defined(MOZILLA_INTERNAL_API)
listener_->AddSelf();
#endif
// Always happens before we can DetachMedia()
static_cast<VideoSessionConduit *>(conduit_.get())->
AttachRenderer(renderer_);
return MediaPipelineReceive::Init();
}
#ifndef USE_FAKE_MEDIA_STREAMS
void MediaPipelineReceiveVideo::SetPrincipalHandle_m(const PrincipalHandle& principal_handle)
{
listener_->SetPrincipalHandle_m(principal_handle);
}
#endif // USE_FAKE_MEDIA_STREAMS
} // end namespace