前言
最近有个需求是做web自动化脚本录制,简单来说就是点击页面上的一个元素,就能把对应节点的xpath和自动化语法添加到脚本中。在airtest中,已经实现了这个功能,这里不再演示,通过实际使用,我认为这一技术实现的关键在于devtools,即通过与浏览器devtools的交互,实现当前节点的选取与脚本录制。
目前网上能搜索到的文章有限,本文将详细描述与自动化相关的devtools协议的研究与demo实现。
正文
首先是去寻找下devtools的文档。CDP协议,全程Chrome DevTools Protocol,是用来检测、检查、调试和分析 Chromium、Chrome 和其他基于 Blink 的浏览器。通过开启调试端口,就能与devtools进行通信,从而实现事件与监听。
开始
首先使用chrome.exe --remote-debugging-port=9222
开启chrome的远程调试端口。也可以加上--headless
启用无头浏览器。
当启动完成后,访问http://localhost:9222/json
即可获取到可交互的devtools地址。devtools采用websocket协议进行通信,通过与webSocketDebuggerUrl进行websocket连接即可与devtools进行通信。
到这,准备工作已经完成,现在需要分析需要的devtools协议。
使用原有工具观察协议
相信各位都使用过devtools的inspect模式,这个模式能高亮选中的元素,并且当点击这个元素后,elements标签里会跳到对应节点的html代码处。
要观察devtools协议传输了什么,可以通过打开devtools前端的协议观察器。在devtools的试验设置里可以打开。打开后,在更多里打开协议监听器。
点击inspect,选中页面上的元素,即可出现相关的协议请求,包括请求方法,请求参数,还有响应。
数据格式
当然,知道了通信具体参数并不代表可以直接发送数据了,我们还需了解交互的格式,在官方文档中,我们找到了一份getting-start,这份文档用js写了份示例,根据示例我们可以清楚看到websocket交互的请求体。
1 2 3 4 5 6 7
| { "id": 1, "method": "", "params": {
} }
|
其中id作为请求的唯一标识,用于让请求方标识返回的内容是对应哪个请求的,当devtools返回时,会将此id返回,因此当需要当出现多个请求同时发出时,需要确保id唯一。
method字段对应响应的方法,而params字段则是方法对应的参数。
最后发送的是json数据的stringfy。
大体实现
回到protocol monitor,我们可以发现当启用inspect模式时,会发送Overlay.setInspectMode
方法,开启检查模式。虽然也可以在官方文档中看到,但是官方文档方法实在太多了,不如这种方式来的直接。
其次在页面上选中元素时,会接收一个Overlay.nodeHighlightRequested
事件,会返回一个nodeId,而当在页面上点击一个元素时会接收一个Overlay.inspectNodeRequested
事件,返回一个backendNodeId。
而无论是nodeId还是backendNodeId,都可以通过DOM.describeNode
方法发送指定参数的请求,获取这个节点的详细信息。这个方法的参数如下:
1 2 3 4 5 6 7
| { "nodeId": "", "backendNodeId": "", "objectId": "", "depth": "", "pierce": "" }
|
实际使用中,我们只需要发送nodeId或者backendNodeId即可。
其他实现自动化需要的事件还有DOM.documentUpdated
,这个事件说明dom被更新了,需要重新获取dom,否则前后端的节点会不一致,可以通过DOM.getDocument
方法重新获取dom,顺带一提,连接完成后,也应该执行一次这个方法。
至此,我们已经可以完成一个简单的自动化节点监控。下面简单写下代码。
简易实现代码
这里我们使用websocket-client作为websocket的客户端连接到devtools。
先从http://127.0.0.1:9222/json
获取到webSocketDebuggerUrl。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
| send_msg_template = { "id": 1, "method": "", "params": {
} } searchForNode = { "mode": "searchForNode", "highlightConfig": { "showInfo": True, "showRulers": False, "showStyles": False, "showAccessibilityInfo": True, "showExtensionLines": False, "contentColor": { "r": 111, "g": 168, "b": 220, "a": 0.66 } } } disableSearchForNode = deepcopy(searchForNode) disableSearchForNode['mode'] = "none" def main():
def on_message(ws, msg): data = json. loads(msg) pprint(data) method = data['method'] if method == 'Overlay.inspectNodeRequested': backend_node_id = data["params"]['backendNodeId'] send_msg(ws, "DOM.describeNode", {'backendNodeId': backend_node_id}) send_msg(ws, "Overlay.setInspectMode", disableSearchForNode) elif method == "Overlay.nodeHighlightRequested": send_msg(ws, "DOM.describeNode", {'nodeId': data['params']['nodeId']}) elif method == "DOM.documentUpdated": send_msg(ws, "DOM.getDocument")
def send_msg(ws, method, params=None): send_msg_template['id'] += 1 msg = deepcopy(send_msg_template) msg['method'] = method msg['params'] = params if params else {} ws.send(json.dumps(msg))
def on_open(ws): send_msg(ws, "DOM.enable") send_msg(ws, "Overlay.enable") send_msg(ws, "Inspector.enable") send_msg(ws, "DOM.getDocument") send_msg(ws, "Overlay.setInspectMode", searchForNode)
from websocket import WebSocketApp ws = WebSocketApp("ws://127.0.0.1:9222/devtools/page/EA63C0AE33770FD58FB89FD3A6ACB165", on_message=on_message, on_open=on_open)
ws.run_forever()
if __name__ == "__main__": t = threading.Thread(target=main) t.start() t.join()
|
这里我们在连接完成后,立即发送几个方法,分别是DOM.enable
,Overlay.enable
,Inspector.enable
,DOM.getDocument
,Overlay.setInspectMode
,其中几个enable是开启对应的功能,这里不再赘述,然后是立即获取一次dom,随后开启inspect模式,这里我们不需要浏览器中devtools的inspect那么多显示,如元素的padding,因此我们只保留最基础的设置。
随后我们在收到消息后就监听几个事件,分别是我们前文所讲的几个事件,随后在节点被按下后,取消inspect模式。
可以看看效果
这里当我们移动到一个元素上时,会打印出该节点的详细信息,在按下节点后,inspect模式随即解除。
使用第三方库监听
当然,我们也可以专注于事件的监听和方法的发送,无需关注websocket的实现,pychrome就是这样的一个库,它的核心代码不过二百多行,它可以通过设置listener来实现指定方法的监听,根据官方示例,我们可以简单改造下上面的代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| import pychrome browser = pychrome.Browser(url="http://127.0.0.1:9222") tab = browser.list_tab()[0]
def get_node(**kwargs): result = tab.call_method("DOM.describeNode", **kwargs) pprint(result)
def get_node_and_stop_inspect(**kwargs): result = tab.call_method("DOM.describeNode", **kwargs) pprint(result) tab.call_method("Overlay.setInspectMode", **disableSearchForNode)
def update_dom(**kwargs): tab.call_method("DOM.getDocument")
tab.set_listener("Overlay.inspectNodeRequested", get_node_and_stop_inspect) tab.set_listener("Overlay.nodeHighlightRequested", get_node) tab.set_listener("DOM.documentUpdated", update_dom)
tab.start() tab.call_method("DOM.enable") tab.call_method("Overlay.enable") tab.call_method("Inspector.enable") tab.call_method("DOM.getDocument") tab.call_method("Overlay.setInspectMode", **searchForNode)
tab.wait(10) tab.stop()
|
改造完成后,瞬间就很清爽,我们只需要对指定事件监听即可,脚本运行的情况与之前脚本相同,这里就不再演示。
唯一遗憾的是pychrome不支持一个event监听多个方法,新方法会覆盖旧方法。之后有机会提下pr吧。
获取节点xpath
至此,我们的自动化录制已经实现了一大部分,那么剩下就是获取这个节点的xpath,唯一确定这个节点的位置,就可以在selenium准确点击这个元素了。
回到刚刚的DOM.describeNode
方法,这个方法的返回如下
1 2 3 4 5 6 7 8 9
| {'node': {'attributes': ['href', 'https://www.iana.org/domains/example'], 'backendNodeId': 14, 'childNodeCount': 1, 'localName': 'a', 'nodeId': 0, 'nodeName': 'A', 'nodeType': 1, 'nodeValue': ''}}
|
我们知道这个节点的名称,attributes,但是我们却没有结构,来完成路径组装。其实DOM.getDocument
是可以获取到完整结构树的,如果没有参数传递,那么最多只会获取1层的子树,我们可以指定depth参数为-1获取整个document结构。
有了整个dom树,那么我们就可以从上到下遍历树,从而获取节点的路径,我们也无需再次通过DOM.describeNode
获取节点的数据。代码示例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| dom_root = tab.call_method("DOM.getDocument", depth=-1)['root'] def get_path_from_root(**kwargs): backend_node_id = kwargs.get("backendNodeId") path = _get_path(dom_root, backend_node_id, "") print("path: " + str(path))
def _get_path(node, backend_node_id, path): """ 递归获取结构 :param node: :param backend_node_id: :param path :return: """ if not node: return None if node['backendNodeId'] == backend_node_id: return path + "/" + node['localName'] else: for child in node.get('children', []): child_path = _get_path(child, backend_node_id, path + "/" + node['localName']) if child_path: return child_path return None
|
我们点击页面上超链接,得到path: //html/body/div/p/a
,这里前面多个/符号是因为root没有名称,获取后做下处理即可。下面的代码会改进。
当然只用路径肯定不够,当有id时,我们优先使用id来实现,顺便改造下之前的代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| def _get_path(node, backend_node_id, path): """ 递归获取结构 :param node: :param backend_node_id: :param path :return: """ if not node: return None node_attributes = node.get('attributes', []) node_id = None for i in range(0, len(node_attributes), 2): if node_attributes[i] == "id": node_id = node_attributes[i + 1] if node_id: new_path = "//*[@id=\"{}\"]".format(node_id) else: new_path = (path + "/" + node['localName']) if node['localName'] else path if node['backendNodeId'] == backend_node_id: return new_path else: for child in node.get('children', []): child_path = _get_path(child, backend_node_id, new_path) if child_path: return child_path return None
|
也许是完善?
当然这个获取xpath的代码还不完全,当节点有其他attr时也可以以这些属性作为xpath,并且还需要考虑同级别下有相同节点,此时需要用序号来标识。
并且通过id获取xpath也不能确保准确,因为网站的编写者可能重复使用id。(CV程序员)
例如,如下的html文件,我们在devtools的element里选中第一个p元素后右键复制XPath后,显示的path是//*[@id="1"]
,但这明显是有问题的,虽然是网页编写者的问题,但我们为了兼容所有系统,可以使用完整的path作为真正的xpath。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| <html>
<head> <title>Test</title> </head>
<body> <div id="app"> <p id='1'>test</p> <p id='2'>test</p> </div> <div id="app"> <p id='1'>test</p> <p id='2'>test</p> </div>
</body>
</html>
|
基于以上观点,我们在这不使用带有attribute的xpath,只使用完整的xpath路径作为节点的唯一路径。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| def _get_path(node, backend_node_id, path, idx=1): """ 递归获取结构 :param node: :param backend_node_id: :param path :return: """ if not node: return None new_path = (path + "/" + node['localName']) if node['localName'] else path if idx != 1 and new_path: new_path = new_path + "[{}]".format(idx) if node['backendNodeId'] == backend_node_id: return new_path else: children = node.get('children', []) tag_map = {} for i in range(len(children)): child = children[i] if child['localName'] in tag_map: tag_map[child['localName']] += 1 else: tag_map[child['localName']] = 1 child_path = _get_path(child, backend_node_id, new_path, tag_map[child['localName']]) if child_path: return child_path
return None
|
通过以上代码我们已经能初步实现步骤的自动化录制。
总结
本文就网页自动化录制提出了自己的一些思路与代码实现,初步实现了自动化录制。当然,我相信仍旧有许多实际问题没有被我发现,只能等之后的具体开发才能发现会有什么问题。
其次,就airtest的功能来看,它认为节点的id不会重复,并且attribute大概率不会重复,因此它的xpath是多样的。
相较于airtest的录制功能,这篇文章最后代码使用的是完整的xpath路径,保证路径唯一。