画面推流代码解析
一、WebSocket 端点注册
routers/stream_router.py
@router.websocket("/api/ws/device/stream")
async def device_stream(websocket: WebSocket):
await websocket.accept() # 接受连接(跳过 origin 校验)
serial = websocket.query_params.get("serial")
device = adb.device(serial)
server = ScrcpyServer(device, version="2.7")
try:
await server.handle_unified_websocket(websocket)
finally:
server.close()前端通过 ws://.../api/ws/device/stream?serial=<设备序列号> 建立连接。
二、启动 scrcpy 服务端并建立 Socket 连接
remote/scrcpy.py: — _start_scrcpy_server()
后端通过 ADB shell 在设备上启动 scrcpy Java 进程:
cmds = [
'CLASSPATH=/data/local/tmp/scrcpy_server.jar',
'app_process', '/',
'com.genymobile.scrcpy.Server', self.version,
'log_level=info', 'max_size=1024', 'max_fps=30',
'video_bit_rate=8000000', 'tunnel_forward=true',
'send_frame_meta=true',
'control=true', 'audio=false',
...
]关键参数:
| 参数 | 说明 |
|---|---|
tunnel_forward=true | 使用 ADB 前向隧道(无需 root) |
max_size=1024 | 限制分辨率,降低带宽 |
max_fps=30 | 限制帧率 |
send_frame_meta=true | 每帧附带元数据(PTS 时间戳等) |
remote/scrcpy.py:58 — _connect_scrcpy()
通过 ADB Local Abstract Socket 建立两条连接(视频 + 控制):
@retry.retry(exceptions=AdbError, tries=20, delay=0.1)
def _connect_scrcpy(self, device: AdbDevice) -> socket.socket:
return device.create_connection(Network.LOCAL_ABSTRACT, 'scrcpy')使用
retry装饰器重试 20 次,因为 scrcpy 服务端启动需要一点时间。
remote/scrcpy.py:61 — _parse_scrcpy_info()
视频连接建立后,首先解析握手信息:
dummy_byte = conn.recv(1) # 握手确认字节 0x00
device_name = conn.recv(64) # 设备名(64字节)
codec = conn.recv(4) # 编解码器标识
resolution_data = conn.recv(8) # 分辨率(两个 uint32 大端序)
self.resolution_width, self.resolution_height = struct.unpack(">II", resolution_data)三、统一 WebSocket 处理器
remote/scrcpy.py:128 — handle_unified_websocket()
async def handle_unified_websocket(self, websocket: WebSocket, serial=''):
# 1. 先发送分辨率,前端据此初始化画布尺寸
await websocket.send_text(json.dumps({
"type": "resolution",
"width": self.resolution_width,
"height": self.resolution_height,
}))
# 2. 并发启动视频推流和控制命令处理
video_task = asyncio.create_task(self._stream_video_to_websocket(...))
control_task = asyncio.create_task(self._handle_control_websocket(...))
try:
await asyncio.gather(video_task, control_task)
finally:
for task in (video_task, control_task):
if not task.done():
task.cancel()两个任务通过 asyncio.gather() 并发运行,任何一方断开都会触发 finally 取消另一个任务,保证资源释放。
四、视频流推送
remote/scrcpy.py:151 — _stream_video_to_websocket()
async def _stream_video_to_websocket(self, conn: socket.socket, ws: WebSocket):
conn.setblocking(False) # 非阻塞模式,配合 asyncio 事件循环
while True:
if ws.client_state.name != "CONNECTED":
break
# 异步读取 socket 数据(最多 1MB)
data = await asyncio.get_event_loop().sock_recv(conn, 1024 * 1024)
if not data:
raise ConnectionError("Video stream ended unexpectedly")
# 以二进制帧推送给前端
await ws.send_bytes(data)关键点:
- socket 设为非阻塞模式,通过
loop.sock_recv()与 asyncio 事件循环集成,不会阻塞其他协程 - 每次读取最多 1MB,直接透传给前端(H.264 裸流),由前端使用 WebCodecs API 解码
五、控制命令处理
remote/scrcpy.py:168 — _handle_control_websocket()
前端通过同一条 WebSocket 发送 JSON 格式的控制命令:
async def _handle_control_websocket(self, ws: WebSocket):
while True:
message = await ws.receive_text()
message = json.loads(message)
width, height = self.resolution_width, self.resolution_height
match message['type']:
case 'touchDown':
self.controller.down(int(xP * width), int(yP * height), ...)
case 'touchMove':
self.controller.move(int(xP * width), int(yP * height), ...)
case 'touchUp':
self.controller.up(int(xP * width), int(yP * height), ...)
case 'keyEvent':
self.device.shell(f'input keyevent {event_number}')
case 'text':
self.device.shell(f'am broadcast -a SONIC_KEYBOARD --es msg \'{text}\'')
case 'ping':
await ws.send_text(json.dumps({"type": "pong"}))前端传递的坐标是比例值(xP/yP 范围 0~1),后端乘以实际分辨率换算为绝对像素坐标,再通过 ScrcpyTouchController 写入控制 socket 发送给设备。
六、scrcpy 3.x 的双工管道方案
remote/scrcpy3.py:54 — v3.3.3 采用了更简洁的双工抽象:
def stream_to_websocket(self, ws: WebSocket):
socket_duplex = RWSocketDuplex(self._video_sock, self._control_sock)
websocket_duplex = WebSocketDuplex(ws)
return pipe_duplex(socket_duplex, websocket_duplex)remote/pipe.py:13 — pipe_duplex() 实现真正双向透明转发:
async def pipe_duplex(a, b):
task_ab = asyncio.create_task(_pipe_oneway(a, b)) # socket → websocket
task_ba = asyncio.create_task(_pipe_oneway(b, a)) # websocket → socket
done, pending = await asyncio.wait([task_ab, task_ba],
return_when=asyncio.FIRST_COMPLETED)
for t in pending:
t.cancel() # 任一方断开,立即取消另一个v3.x 将视频和控制统一为单一双工管道,由前端的 WebCodecs 自行解析协议,后端仅负责字节透传,架构更简洁。
消息协议汇总
| 方向 | 类型 | 格式 | 说明 |
|---|---|---|---|
| 后端→前端 | 文本 JSON | {"type":"resolution","width":W,"height":H} | 连接建立时首帧 |
| 后端→前端 | 二进制 | H.264 裸流 | 视频帧,持续推送 |
| 后端→前端 | 文本 JSON | {"type":"pong"} | 心跳响应 |
| 前端→后端 | 文本 JSON | {"type":"touchDown","xP":0.5,"yP":0.3} | 触摸按下 |
| 前端→后端 | 文本 JSON | {"type":"touchMove","xP":0.5,"yP":0.3} | 触摸移动 |
| 前端→后端 | 文本 JSON | {"type":"touchUp","xP":0.5,"yP":0.3} | 触摸抬起 |
| 前端→后端 | 文本 JSON | {"type":"keyEvent","data":{"eventNumber":4}} | 按键事件 |
| 前端→后端 | 文本 JSON | {"type":"text","detail":"hello"} | 文字输入 |
| 前端→后端 | 文本 JSON | {"type":"ping"} | 心跳保活 |