# Copyright (c) Microsoft Corporation. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import pathlib from typing import Dict, Optional, Union, cast from playwright._impl._api_structures import TracingGroupLocation from playwright._impl._artifact import Artifact from playwright._impl._connection import ChannelOwner, from_nullable_channel from playwright._impl._helper import locals_to_params class Tracing(ChannelOwner): def __init__( self, parent: ChannelOwner, type: str, guid: str, initializer: Dict ) -> None: super().__init__(parent, type, guid, initializer) self._channel.mark_as_internal_type() self._include_sources: bool = False self._stacks_id: Optional[str] = None self._is_tracing: bool = False self._traces_dir: Optional[str] = None async def start( self, name: str = None, title: str = None, snapshots: bool = None, screenshots: bool = None, sources: bool = None, ) -> None: params = locals_to_params(locals()) self._include_sources = bool(sources) await self._channel.send("tracingStart", params) trace_name = await self._channel.send( "tracingStartChunk", {"title": title, "name": name} ) await self._start_collecting_stacks(trace_name) async def start_chunk(self, title: str = None, name: str = None) -> None: params = locals_to_params(locals()) trace_name = await self._channel.send("tracingStartChunk", params) await self._start_collecting_stacks(trace_name) async def _start_collecting_stacks(self, trace_name: str) -> None: if not self._is_tracing: self._is_tracing = True self._connection.set_is_tracing(True) self._stacks_id = await self._connection.local_utils.tracing_started( self._traces_dir, trace_name ) async def stop_chunk(self, path: Union[pathlib.Path, str] = None) -> None: await self._do_stop_chunk(path) async def stop(self, path: Union[pathlib.Path, str] = None) -> None: await self._do_stop_chunk(path) await self._channel.send("tracingStop") async def _do_stop_chunk(self, file_path: Union[pathlib.Path, str] = None) -> None: self._reset_stack_counter() if not file_path: # Not interested in any artifacts await self._channel.send("tracingStopChunk", {"mode": "discard"}) if self._stacks_id: await self._connection.local_utils.trace_discarded(self._stacks_id) return is_local = not self._connection.is_remote if is_local: result = await self._channel.send_return_as_dict( "tracingStopChunk", {"mode": "entries"} ) await self._connection.local_utils.zip( { "zipFile": str(file_path), "entries": result["entries"], "stacksId": self._stacks_id, "mode": "write", "includeSources": self._include_sources, } ) return result = await self._channel.send_return_as_dict( "tracingStopChunk", { "mode": "archive", }, ) artifact = cast( Optional[Artifact], from_nullable_channel(result.get("artifact")), ) # The artifact may be missing if the browser closed while stopping tracing. if not artifact: if self._stacks_id: await self._connection.local_utils.trace_discarded(self._stacks_id) return # Save trace to the final local file. await artifact.save_as(file_path) await artifact.delete() await self._connection.local_utils.zip( { "zipFile": str(file_path), "entries": [], "stacksId": self._stacks_id, "mode": "append", "includeSources": self._include_sources, } ) def _reset_stack_counter(self) -> None: if self._is_tracing: self._is_tracing = False self._connection.set_is_tracing(False) async def group(self, name: str, location: TracingGroupLocation = None) -> None: await self._channel.send("tracingGroup", locals_to_params(locals())) async def group_end(self) -> None: await self._channel.send("tracingGroupEnd")