更多deepstream的實戰應用:
主要依賴gstreamer中的rtpbin元素實踐
RTP bin結合了rtpsession
、rtpssrcdemux
、rtpjitterbuffer
和rtpptdemux
的功能,將它們集成為一個元素。它允許多個RTP會話,這些會話將使用RTCP SR封包進行同步。
rtpbin
配置了一些請求端口(request pads),這些端口定義了要啟動的功能,類似於rtpsession
元素。
使用rtpbin
進行RTP接收與發送
recv_rtp_sink_%u
端口
recv_rtp_sink_%u
端口上接收的數據將在rtpsession
管理器中進行處理,經過驗證後轉發到rtpssrcdemux
元素。rtpjitterbuffer
。在從抖動緩衝區釋放數據包後,它們將被轉發到rtpptdemux
元素。rtpptdemux
元素將根據有效載荷類型解多路復用數據包,並將會話號碼、SSRC和載荷類型分別作為端口名稱,在rtpbin
上創建一個唯一的pad recv_rtp_src_%u_%u_%u
。send_rtp_sink_%u
端口send_rtp_src_%u
端口。如果未提供會話號碼,則將返回最低可用會話的端口。send_rtp_src_%
u端口。使用rtpbin
進行RTCP接收與發送
recv_rtcp_sink_%u
端口。必須在端口名稱中指定會話號碼。send_rtcp_src_%u
端口。在此端口上推送的封包包含應發送給會話中所有參與者的SR/RR RTCP報告。使用get-internal-session
屬性來訪問rtpbin
的內部統計信息。此動作信號提供對RTPSession對象的訪問,RTPSession對象進一步提供了獲取內部源和其他源的動作信號。
範例1:單純使用rtpbin
接收RTP packets
接收來自端口5000的RTP數據,並發送到rtpbin
中的session 0
gst-launch-1.0 udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink_0 \
rtpbin ! rtptheoradepay ! theoradec ! xvimagesink
範例2: 視訊、音訊分別接收並發送RTP與RTCP packets
RTP packets管道
udpsink
的port 5000udpsink
的port 5002RTCP packets管道
udpsink
上配置sync=false
和async=false
gst-launch-1.0 rtpbin name=rtpbin \
v4l2src ! videoconvert ! ffenc_h263 ! rtph263ppay ! rtpbin.send_rtp_sink_0 \
rtpbin.send_rtp_src_0 ! udpsink port=5000 \
rtpbin.send_rtcp_src_0 ! udpsink port=5001 sync=false async=false \
udpsrc port=5005 ! rtpbin.recv_rtcp_sink_0 \
audiotestsrc ! amrnbenc ! rtpamrpay ! rtpbin.send_rtp_sink_1 \
rtpbin.send_rtp_src_1 ! udpsink port=5002 \
rtpbin.send_rtcp_src_1 ! udpsink port=5003 sync=false async=false \
udpsrc port=5007 ! rtpbin.recv_rtcp_sink_1
範例3: 視訊、音訊分別接收並發送RTP與RTCP packets並撥放
udpsrc
接收port 5000上的H263視頻,通過rtpbin
的recv_rtp_sink_0
發送到session 0,解包、解碼並顯示影片rtpbin
的recv_rtp_sink_1
發送到session 1,解包、解碼並播放rtpbin
的recv_rtcp_sink_0
以port 5000+1接收session 0的RTCP封包rtpbin
的recv_rtcp_sink_0
以port 5002+1接收session 1的RTCP封包
gst-launch-1.0 -v rtpbin name=rtpbin \
udpsrc caps="application/x-rtp,media=(string)video,clock-rate=(int)90000,encoding-name=(string)H263-1998" \
port=5000 ! rtpbin.recv_rtp_sink_0 \
rtpbin. ! rtph263pdepay ! ffdec_h263 ! xvimagesink \
udpsrc port=5001 ! rtpbin.recv_rtcp_sink_0 \
rtpbin.send_rtcp_src_0 ! udpsink port=5005 sync=false async=false \
udpsrc caps="application/x-rtp,media=(string)audio,clock-rate=(int)8000,encoding-name=(string)AMR,encoding-params=(string)1,octet-align=(string)1" \
port=5002 ! rtpbin.recv_rtp_sink_1 \
rtpbin. ! rtpamrdepay ! amrnbdec ! alsasink \
udpsrc port=5003 ! rtpbin.recv_rtcp_sink_1 \
rtpbin.send_rtcp_src_1 ! udpsink port=5007 sync=false async=false
uridecodebin
進行客製修改對應Deepstream中使用uridecodebin
元素作為接收rtsp stream的工具,因此必須根據使用的插件內容來發送訊號、訪問RTCP SR、並捕獲、解析RTCP 封包
uridecodebin
內部完整的的element組成decodebin
內部會調用rtspsrc
來解析範例中的Video Decode Pipeline
rtspsrc
負責連接 RTSP 服務器並讀取數據。rtspsrc
就像一個實時信號源,因此只會在播放狀態下生成數據。rtspsrc
在內部使用rtpbin
實例化一個 "RTP 會話管理器"(RTP session manager)元件,負責與 rtsp server雙向對話,取得RTCP訊息decodebin
與rtspsrc
特性 | decodebin | rtspsrc |
---|---|---|
定義 | - decodebin 是一個自動解碼器元件 (element) |
- rtspsrc 是一個用於接收 RTSP 流的元件 (element) |
功能 | - 自動檢測和使用適當的解碼器來解碼多媒體數據 | - 處理 RTSP 協議,從 RTSP 伺服器接收數據 |
- 可以處理音頻和視頻數據 | - 可以接收 RTP 和 RTCP 數據 | |
用途 | - 用於動態解碼多媒體數據,不需要事先知道數據的格式 | - 用於從 RTSP 伺服器接收多媒體流 |
連接性 | - 通常連接到數據源 (例如 filesrc 或 rtspsrc ) 來接收未解碼的多媒體數據 |
- 通常連接到解碼器 (例如 decodebin ) 或其他處理 RTP 數據的元件,以獲取解碼後的多媒體數據 |
特殊功能 | - 當它接收到數據時,它會動態地選擇和更改解碼器 | - 支持 RTSP 的各種功能,如認證、選擇媒體流、控制播放速度等 |
def on_receiving_rtcp_callback
rtcp_packet.sr_get_sender_info()
裡面包含了:
# Process RTCP Packets ----------------
def on_receiving_rtcp_callback(session, buffer: Gst.Buffer):
"""
Fetch the NTP and RTP reference Timestamp in the RTCP Sender Report
Map RTCP and RTP packet data to calculate latency.
"""
rtcp_buffer = GstRtp.RTCPBuffer()
if not GstRtp.RTCPBuffer.map(buffer, Gst.MapFlags.READ, rtcp_buffer):
logging.warning("\t\t[on_receiving_rtcp_callback] : Unable to map RTCP buffer")
return Gst.PadProbeReturn.PASS
try:
rtcp_packet = GstRtp.RTCPPacket()
if not rtcp_buffer.get_first_packet(rtcp_packet):
logging.warning("\t\t[on_receiving_rtcp_callback] : Unable to get the first RTCP packet")
return Gst.PadProbeReturn.PASS
while rtcp_packet:
if rtcp_packet.get_type() == GstRtp.RTCPType.SR:
rtcp_data = *rtcp_packet.sr_get_sender_info()
if not rtcp_packet.move_to_next():
break
finally: # Add finally block to ensure buffer is unmapped
GstRtp.RTCPBuffer.unmap(rtcp_buffer)
return Gst.PadProbeReturn.PASS
def rtp_depay_sink_pad_buffer_probe
rtpdepay
# Process RTP Packets and Calculate RTSP Latency -----------------------------
def rtp_depay_sink_pad_buffer_probe(
self, pad: Gst.Pad, info: Gst.PadProbeInfo, u_data: Any
) -> Gst.PadProbeReturn:
"""
Handle the sink pad buffer probe for RTP buffer in depay.
Parameters:
- pad (Gst.Pad): The pad to fetch the buffer from.
- info (Gst.PadProbeInfo): Information about the buffer.
"""
buffer = info.get_buffer()
res, rtp_buffer = GstRtp.RTPBuffer.map(buffer, Gst.MapFlags.READ)
try:
rtp_data = [
rtp_buffer.get_timestamp(),
rtp_buffer.get_ssrc(),
rtp_buffer.get_payload_type(),
rtp_buffer.get_marker(),
rtp_buffer.get_seq()
]
finally:
rtp_buffer.unmap()
return Gst.PadProbeReturn.OK
uridecodebin
中使用Sample code modified from NVIDIA-AI-IOT/deepstream_python_apps/deepstream_test1_rtsp_in_rtsp_out.py
由於uridecodebin
內部會自動建立接收rtsp等相關的元件,且內部會使用高階APIrtspsrc
,因此需要建立一個manager的callback函式,讓rtspsrc
內部rtpbin
的連接到RTPsession物件,透過rtspsrc.connect("new manager", new_manager_callback)
這樣的方式連接
def new_manager_callback
對應前面所提:rtspsrc
在內部使用rtpbin實例化一個 “RTP 會話管理器”(RTP session manager)元件,負責與 rtsp server雙向對話,取得RTCP訊息
rtpsrc
中的元件為rtpbin
rtpbin
來作為RTCP接收器,使用get_request_pad
動態建立recv_rtcp_sink_%u
接口
- use
rtpbin
as an RTCP receiver, request arecv_rtcp_sink_%u
pad. The session number must be specified in the pad name.- Access to the internal statistics of
rtpbin
is provided with theget-internal-session
property. This action signal gives access to the RTPSession object which further provides action signals to retrieve the internal source and other sources.
def new_manager_callback (rtspsrc, manager, index, *args):
'''
Manage RTP sessions for accessing RTCP packets
'''
element_name = manager.get_factory().get_name()
# print(f"\t[new_manager_callback] : List element_name '{element_name}-{index}' in Decodebin ")
if element_name != "rtpbin":
# print(f"\t[new_manager_callback] : Manager is of type {element_name}, not rtspsrc. ")
return
# Establish RTP session in src pad for fetching RTCP packets
sinkpad = manager.get_request_pad(f"recv_rtcp_sink_{index}")
session = manager.emit("get-internal-session", 0)
session.connect("on-receiving-rtcp", on_receiving_rtcp_callback)
上面的manager的相關操作等價於下面的操作方式:
# Receive RTCP Packets -------------------------
# Configure rtpbin to handle the RTP/RTCP session
recv_rtcp_sink_pad = Gst.Element.get_request_pad(rtpbin, f"recv_rtcp_sink_{index}")
session = rtpbin.emit("get-internal-session", 0)
session.connect("on-receiving-rtcp", on_receiving_rtcp_callback)
def decodebin_child_added
uridecodebin
中rtspsrc
元件,準備連接建立"RTP session manager"以接收RTCP資料
[decodebin_child_added] : Element type of 'source-00': 'rtspsrc'
[decodebin_child_added] : Successfully connected 'on-receiving-rtcp' with 'source-00'.
uridecodebin
中,名稱有"depay"的元件,插入解析RTP Buffer的探針函示(rtp_depay_sink_pad_buffer_probe
)以獲取資料def decodebin_child_added(child_proxy, Object, name, user_data, index):
print("Decodebin child added:", name, "\n")
if name.find("decodebin") != -1:
Object.connect("child-added", new_manager_callback, user_data)
if "source" in name:
element_type = Object.get_factory().get_name()
if element_type == "rtspsrc":
try:
# Use functools.partial to freeze the 'index' parameter for the callback
# Object.connect("new-manager", new_manager_callback)
Object.connect("new-manager", partial(new_manager_callback, index=index))
# Add Probe for *depay, which is used to extract video from RTP packets.
# uridecodebin is a high-level element that dynamically creates its internal pipeline, including elements like rtph264depay,
# based on the media it's given. This means that the rtph264depay element may not exist until the pipeline is set to the PAUSED
# or PLAYING state and has detected that the incoming stream is H.264 encoded.
if ("depay") in name:
element_type = Object.get_factory().get_name()
depay_sinkpad = Object.get_static_pad("sink")
if depay_sinkpad:
depay_sinkpad.add_probe(
Gst.PadProbeType.BUFFER,
rtp_depay_sink_pad_buffer_probe,
0,
)
def create_source_bin
partial(decodebin_child_added, index=index)
連結decodebin_child_added
函式,方便傳遞index
參數進入、增加可讀性def create_source_bin(index, uri):
bin_name = "source-bin-%02d" % index
nbin = Gst.Bin.new(bin_name)
...
uri_decode_bin.connect("child-added", partial(decodebin_child_added, index=index), nbin)
...
return nbin
rtspsrc
元素來接收rtsp的uri,並以此發送"new-manager"訊號(emit signal)給rtpbin
rtph264depay
的輸入sink pad)上添加探針來連接該功能
pipeline = Gst.parse_launch("rtspsrc name = rtspsrc locationmrtsp://some.server/url | rtph264depay nane=depay ! appsink nane=sink")
rtspsrc = pipeline.get_by_name('rtspsrc')
rtspsrc.connect("new-manager" ‚ new_manager_callback)
depay = pipeline.get_by_name('depay')
sinkpad = depay.get_static_pad('sink')
probeID = sinkpad.add_probe(Gst.PadProbeType.BUFFER, calculate_tinestamp)
rtpbin
收到訊號,透過def new_manager_callbacK
函式獲得的對話訊息訪問"RTCP SR"rtpbin
元素時,它將作為管理器參數傳遞給我們的回調函數
def new_manager_callback ( rtspsrc, manager):
sinkpad = manager.get_request_pad("recv_rtp_sink_0")
session = manager.emit("get-internal-session", 0)
if session:
session.connect("on-receiving-rtcp", on_receiving_rtcp_callback)
else:
print("Failed to get RTPSession from the manager")
def on_receiving_rtcp_callback
,使用GstRtp.RTCPBuffer
捕獲、解析RTCP 數據封包def on_receiving_rtcp_callback
函數將傳遞一個 Gst.Buffer 類型的緩沖區
def on_receiving_rtcp_callback (session, buffer: Gst.Buffer):
rtcp_buffer = GstRtp.RTCPBuffer()
res = GstRtp.RTCPBuffer.map (buffer, Gst.MapFlags.READ, rtcp_buffer)
rtcp_packet = GstRtp.RTCPPacket()
packet = rtcp_buffer.get_first _packet ( rtcp_packet)
while True:
if rtcp_packet.get_type() == GstRtp.RTCPType.SR :
si = rtcp_packet.sr_get_sender_info()
ntp_time = si[1]
rtp_time = si[2]
if rtcp_packet.move_to_next () ==False:
break
def calculate_timestamp
FRAME_TS= LAST_NTP + (RTP — LAST_RTP) / 90000
Where
def calculate_timestamp(pad, info):
res, rtpBuff = GstRtp.RTPBuffer.map(info.get_buffer(), Gst.MapFlags.READ)
tv = timevalue()
last_rtcp_ntp_time = c_uint64(ntp_timestamp)
LibNTP.ntp2tv(byref(last_rtcp_ntp_time), byref(tv))
rtp_diff = float(rtpBuff.get_timestamp() - rtp_timestamp) / 90000.0
timestamp = float(tv.tv_sec) + float(tv.tv_usec) / 1000000.0 + rtp_diff
return Gst.PadProbeReturn.OK
1.Get the RTPSession object from the RtpBin
g_signal_emit_by_name (rtpbin, "get-internal-session", id, &session);
2.Attach to the "on-receiving-rtcp" signal (or to the more specialized ones):
g_signal_connect_after (session, "on-receiving-rtcp", G_CALLBACK (on_rtcp_callback), my_callback_data);
3.Look for the RTCP message you want
gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp_buffer)); new_packet = gst_rtcp_buffer_get_first_packet (&rtcp_buffer, &rtcp_packet); while (gst_rtcp_packet_move_to_next (&rtcp_packet) { type = gst_rtcp_packet_get_type (&rtcp_packet); switch (type) { ... } }
Most of GStreamer's key RTP components live in gst-plugins-good:
rtpmanager
plugin contains elements like rtpbin and rtpjitterbufferrtp
plugin contains RTP payloading and depayloading elements for many different codecs and container formatsrtpbin
is the high-level RTP component and supports sending and receiving, just sending or just receiving data, with and without RTCP support. This is the bin that does it all: it adapts dynamically to your needs based on the requested pads; it also contains an rtpjitterbuffer.To use rtpbin as an RTP receiver, request a
recv_rtp_sink_%u
pad. The session number must be specified in the pad name. Data received on therecv_rtp_sink_%u
pad will be processed in the rtpsession manager and after being validated forwarded on rtpssrcdemux element. …
Access to the internal statistics of rtpbin is provided with the get-internal-session property. This action signal gives access to the RTPSession object which further provides action signals to retrieve the internal source and other sources.
g_signal_emit_by_name (rtpbin, "get-internal-session", id, &ret);
rtspsrc will internally instantiate an RTP session manager element that will handle the RTCP messages to and from the server, jitter removal, packet reordering along with providing a clock for the pipeline. This feature is implemented using the
gstrtpbin
element.
To also use rtpbin as an RTCP receiver, request a
recv_rtcp_sink_%u
pad. The session number must be specified in the pad name.
new_manager_callback (GstElement * rtspsrc,
GstElement * manager,
gpointer udata)
def on_receiving_rtcp_callback (session, buffer, udata):
#python callback for the 'on-receiving-rtcp' signal
GstRtp.RTPBuffer.get_timestamp
Get the timestamp of the RTP packet in buffer.
def GstRtp.RTPBuffer.get_timestamp (self):
#python wrapper for 'gst_rtp_buffer_get_timestamp'
GstRtp.RTPBuffer.get_marker
Check if the marker bit is set on the RTP packet in buffer.
def GstRtp.RTPBuffer.get_marker (self):
#python wrapper for 'gst_rtp_buffer_get_marker'
GstRtp.RTPBuffer.get_ssrc
Get the SSRC of the RTP packet in buffer.
def GstRtp.RTPBuffer.get_ssrc (self):
#python wrapper for 'gst_rtp_buffer_get_ssrc'
GstRTCPBuffer 的c++原始程式碼
The GstRTPCBuffer helper functions makes it easy to parse and create regular Gst.Buffer objects that contain compound RTCP packets. These buffers are typically of 'application/x-rtcp' Gst.Caps.
An RTCP buffer consists of 1 or more GstRtp.RTCPPacket structures that you can retrieve with GstRtp.RTCPBuffer.get_first_packet. GstRtp.RTCPPacket acts as a pointer into the RTCP buffer; you can move to the next packet with GstRtp.RTCPPacket.move_to_next.
GstRtp.RTCPBuffer.get_first_packet
Initialize a new GstRtp.RTCPPacket pointer that points to the first packet in rtcp.
def GstRtp.RTCPBuffer.get_first_packet (self, packet):
#python wrapper for 'gst_rtcp_buffer_get_first_packet'
GstRtp.RTCPBuffer.map
The resulting RTCP buffer state is stored in rtcp.
def GstRtp.RTCPBuffer.map (buffer, flags, rtcp):
#python wrapper for 'gst_rtcp_buffer_map'
Open buffer for reading or writing, depending on flags.
def GstRtp.RTCPPacket.sr_get_sender_info (self):
#python wrapper for 'gst_rtcp_packet_sr_get_sender_info'
gst_rtcp_packet_sr_get_sender_info (GstRTCPPacket * packet,
guint32 * ssrc,
guint64 * ntptime,
guint32 * rtptime,
guint32 * packet_count,
guint32 * octet_count)
source code : gst-libs/gst/rtp/gstrtcpbuffer.c/gst_rtcp_packet_sr_get_sender_info
/**
* gst_rtcp_packet_sr_get_sender_info:
* @packet: a valid SR #GstRTCPPacket
* @ssrc: result SSRC
* @ntptime: result NTP time
* @rtptime: result RTP time
* @packet_count: result packet count
* @octet_count: result octet count
*
* Parse the SR sender info and store the values.
*/
void
gst_rtcp_packet_sr_get_sender_info (GstRTCPPacket * packet, guint32 * ssrc,
guint64 * ntptime, guint32 * rtptime, guint32 * packet_count,
guint32 * octet_count)
{
guint8 *data;
g_return_if_fail (packet != NULL);
g_return_if_fail (packet->type == GST_RTCP_TYPE_SR);
g_return_if_fail (packet->rtcp != NULL);
g_return_if_fail (packet->rtcp->map.flags & GST_MAP_READ);
data = packet->rtcp->map.data;
/* skip header */
data += packet->offset + 4;
if (ssrc)
*ssrc = GST_READ_UINT32_BE (data);
data += 4;
if (ntptime)
*ntptime = GST_READ_UINT64_BE (data);
data += 8;
if (rtptime)
*rtptime = GST_READ_UINT32_BE (data);
data += 4;
if (packet_count)
*packet_count = GST_READ_UINT32_BE (data);
data += 4;
if (octet_count)
*octet_count = GST_READ_UINT32_BE (data);
}