中国 · 南京 · 栖霞区紫东路2号紫东国际创意园B3-2幢5F
+86-18994094214 (仅工作日:8:30~17:30)
contact@oakchina.cn

OAK PoE设备独立模式详细介绍及使用

OAK PoE设备独立模式详细介绍及使用

POE 设备独立模式

独立运行模式意味着 OAK 相机可以脱离其他计算机自己独立工作,并且可以通过相机端编程来实现和其他设备的自定义数据交互。该模式下工作,需要把 OAK 相机端运行的程序和 NN 模型全部加载到 OAK 相机内置的 Flash 中保存。设备启动后,OAK 相机的主控可以将 Flash 中的程序和 NN 模型加载到 OAK 的闪存中运行。

由于主机和设备之间不会有任何通信,因此首先需要删除所有 XLinkOut 和 XLinkIn 节点。这意味着设备只能通过 Script 节点(网络协议:HTTP/TCP/UDP)与 “外部世界” 通信。

Flash 引导加载程序

Bootloader 与 depthai 打包在一起,因此如果您有最新的 depthai 版本,您可以闪存最新的 Bootloader 版本。要 Flash 引导加载程序,使用 python poe_standalone/standalone.py flash_bootloaderpoe_standalone flash_bootloader 。
要查看其背后的 API 代码,请参见FlashBootloader示例代码。

Flash 管道

已经在设备上Flash 引导加载程序之后,可以将管道连同它的资产 (NN 模型) 一起 Flash 到 OAK 的闪存:
python poe_standalone/standalone.py -P <pipeline> flash_pipelinepoe_standalone -P <pipeline> flash_pipeline

代码参考

import depthai as dai

pipeline = dai.Pipeline()

# Define standalone pipeline; add nodes and link them
# cam = pipeline.create(dai.node.ColorCamera)
# script = pipeline.create(dai.node.Script)
# ...

# Flash the pipeline
(f, bl) = dai.DeviceBootloader.getFirstAvailableDevice()
bootloader = dai.DeviceBootloader(bl)
progress = lambda p: print(f'\rFlashing progress: {p:.2%}')
bootloader.flash(progress, pipeline)

成功 Flash 管道后,当您启动设备时,它将自动启动。如果要更改管道,只需再次重新 Flash。

管道可视化

参考 DepthAI Pipeline Graph

  • simple_pipeline
  • yolo_stereo_decoding

代码示例

github 仓库:OAK-POE Standalone

安装(可选)

pip install -e .

或者

pip install .

使用帮助

python poe_standalone/standalone.py -h 或 poe_standalone -h


 Usage: standalone.py [OPTIONS] COMMAND [ARGS]...                               
                                                                                
 OAK POE STANDALONE SCRIPTS                                                     
                                                                                
╭─ Options ────────────────────────────────────────────────────────────────────╮
│ --device_ip        -device_ip  IP ADDRESS             The IP of the OAK      │
│                                                       device you want to     │
│                                                       connect to. The        │
│                                                       default is to list all │
│                                                       for you to choose      │
│                                                       from.                  │
│ --host_ip          -host_ip    IP ADDRESS             The IP of the Host pc  │
│                                                       you want to connect    │
│                                                       to.                    │
│                                                       [default: 192.168.0.1] │
│ --pipeline         -P          [script_http_server|s  Pipeline you want to   │
│                                cript_mjpeg_server|tc  start.                 │
│                                p_streaming_server|tc  [default:              │
│                                p_streaming_server_co  script_http_server]    │
│                                nfig_focus|tcp_stream                         │
│                                ing_client|yolo_decod                         │
│                                ing|yolo_stereo_decod                         │
│                                ing|custom_pipeline]                          │
│ --custom_pipeline  -cp         FILE PATH              The custom pipeline    │
│                                                       you want to start.     │
│ --port             -p          INTEGER                NETWORK PORT           │
│                                                       [default: 5000]        │
│ --blob_path        -b          FILE PATH              YOLO Blob path to use  │
│ --config_path      -c          FILE PATH              YOLO Config path to    │
│                                                       use                    │
│ --help             -h                                 Show this message and  │
│                                                       exit.                  │
╰──────────────────────────────────────────────────────────────────────────────╯
╭─ Commands ───────────────────────────────────────────────────────────────────╮
│ clear_pipeline    Clear the flashed app on the device                        │
│ flash_bootloader  Flash the bootloader to the device                         │
│ flash_pipeline    Flash the pipeline to the device                           │
│ run               Run the program in host mode                               │
│ save_pipeline     Saves application package to a file which can be flashed   │
│                   to depthai device.                                         │
│ set_ip            Sets IP of the POE device                                  │
╰──────────────────────────────────────────────────────────────────────────────╯

示例管道

管道启动:

python poe_standalone/standalone.py -P script_mjpeg_server [flash_pipeline]

poe_standalone -P script_mjpeg_server [flash_pipeline]

主机端测试程序

启动:

python poe_host/tcp_streaming_server_host.py 

tcp_streaming_server_host

自定义管道参考代码

#!/usr/bin/env python3
# coding=utf-8
import time

import depthai as dai
from loguru import logger
from string import Template

try:
    from poe_standalone.utils import getDeviceInfo
except ImportError:
    from utils import getDeviceInfo


def create_pipeline(port=5000, blob_path=None, config_path=None, host_ip=None):
    # 开始定义管道
    pipeline = dai.Pipeline()

    # 定义源 - 彩色相机
    cam = pipeline.create(dai.node.ColorCamera)
    # 视频编码器
    jpeg = pipeline.create(dai.node.VideoEncoder)
    jpeg.setDefaultProfilePreset(cam.getFps(), dai.VideoEncoderProperties.Profile.MJPEG)

    # 脚本节点
    script = pipeline.create(dai.node.Script)
    script.setProcessor(dai.ProcessorType.LEON_CSS)
    scrpt_str = Template("""
        from http.server import BaseHTTPRequestHandler
        import socketserver
        import socket
        import fcntl
        import struct

        PORT = ${_PORT}
        ctrl = CameraControl()
        ctrl.setCaptureStill(True)

        def get_ip_address(ifname):
            s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            return socket.inet_ntoa(fcntl.ioctl(
                s.fileno(),
                -1071617759,  # SIOCGIFADDR
                struct.pack('256s', ifname[:15].encode())
            )[20:24])

        class HTTPHandler(BaseHTTPRequestHandler):
            def do_GET(self):
                if self.path == '/':
                    self.send_response(200)
                    self.end_headers()
                    self.wfile.write(b'<h1>[DepthAI] Hello, world!</h1><p>Click <a href="img">here</a> for an image</p>')
                elif self.path == '/img':
                    node.io['out'].send(ctrl)
                    jpegImage = node.io['jpeg'].get()
                    self.send_response(200)
                    self.send_header('Content-Type', 'image/jpeg')
                    self.send_header('Content-Length', str(len(jpegImage.getData())))
                    self.end_headers()
                    self.wfile.write(jpegImage.getData())
                else:
                    self.send_response(404)
                    self.end_headers()
                    self.wfile.write(b'Url not found...')

        with socketserver.TCPServer(("", PORT), HTTPHandler) as httpd:
            node.warn(f"Serving at {get_ip_address('re0')}:{PORT}")
            httpd.serve_forever()
    """)
    script.setScript(
        scrpt_str.safe_substitute(_PORT=port)
    )

    # 连接
    cam.still.link(jpeg.input)
    script.outputs["out"].link(cam.inputControl)
    jpeg.bitstream.link(script.inputs["jpeg"])

    return pipeline


if __name__ == "__main__":
    with logger.catch():
        # Connect to device with pipeline
        device_info = getDeviceInfo()
        with dai.Device(create_pipeline(port=5000, blob_path=None, config_path=None, host_ip=None),
                        device_info) as device:
            print(f"\t>>> Name: {device_info.name}")
            print(f"\t>>> MXID: {device.getMxId()}")
            print(f"\t>>> Cameras: {[c.name for c in device.getConnectedCameras()]}")
            print(f"\t>>> USB speed: {device.getUsbSpeed().name}")
            while not device.isClosed():
                time.sleep(1)

Tags: ,

索引