# Copyright (c) Microsoft Corporation. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import asyncio import base64 import inspect import re import sys from pathlib import Path from types import SimpleNamespace from typing import ( TYPE_CHECKING, Any, Callable, Dict, List, Literal, Optional, Pattern, Sequence, Union, cast, ) from playwright._impl._accessibility import Accessibility from playwright._impl._api_structures import ( AriaRole, FilePayload, FloatRect, PdfMargins, Position, ViewportSize, ) from playwright._impl._artifact import Artifact from playwright._impl._clock import Clock from playwright._impl._connection import ( ChannelOwner, from_channel, from_nullable_channel, ) from playwright._impl._console_message import ConsoleMessage from playwright._impl._download import Download from playwright._impl._element_handle import ElementHandle from playwright._impl._errors import Error, TargetClosedError, is_target_closed_error from playwright._impl._event_context_manager import EventContextManagerImpl from playwright._impl._file_chooser import FileChooser from playwright._impl._frame import Frame from playwright._impl._greenlets import LocatorHandlerGreenlet from playwright._impl._har_router import HarRouter from playwright._impl._helper import ( ColorScheme, Contrast, DocumentLoadState, ForcedColors, HarMode, KeyboardModifier, MouseButton, ReducedMotion, RouteFromHarNotFoundPolicy, RouteHandler, RouteHandlerCallback, TimeoutSettings, URLMatch, URLMatchRequest, URLMatchResponse, WebSocketRouteHandlerCallback, async_readfile, async_writefile, locals_to_params, make_dirs_for_file, serialize_error, url_matches, ) from playwright._impl._input import Keyboard, Mouse, Touchscreen from playwright._impl._js_handle import ( JSHandle, Serializable, add_source_url_to_script, parse_result, serialize_argument, ) from playwright._impl._network import ( Request, Response, Route, WebSocketRoute, WebSocketRouteHandler, serialize_headers, ) from playwright._impl._video import Video from playwright._impl._waiter import Waiter if TYPE_CHECKING: # pragma: no cover from playwright._impl._browser_context import BrowserContext from playwright._impl._fetch import APIRequestContext from playwright._impl._locator import FrameLocator, Locator from playwright._impl._network import WebSocket class LocatorHandler: locator: "Locator" handler: Union[Callable[["Locator"], Any], Callable[..., Any]] times: Union[int, None] def __init__( self, locator: "Locator", handler: Callable[..., Any], times: Union[int, None] ) -> None: self.locator = locator self._handler = handler self.times = times def __call__(self) -> Any: arg_count = len(inspect.signature(self._handler).parameters) if arg_count == 0: return self._handler() return self._handler(self.locator) class Page(ChannelOwner): Events = SimpleNamespace( Close="close", Crash="crash", Console="console", Dialog="dialog", Download="download", FileChooser="filechooser", DOMContentLoaded="domcontentloaded", PageError="pageerror", Request="request", Response="response", RequestFailed="requestfailed", RequestFinished="requestfinished", FrameAttached="frameattached", FrameDetached="framedetached", FrameNavigated="framenavigated", Load="load", Popup="popup", WebSocket="websocket", Worker="worker", ) accessibility: Accessibility keyboard: Keyboard mouse: Mouse touchscreen: Touchscreen def __init__( self, parent: ChannelOwner, type: str, guid: str, initializer: Dict ) -> None: super().__init__(parent, type, guid, initializer) self._browser_context = cast("BrowserContext", parent) self.accessibility = Accessibility(self._channel) self.keyboard = Keyboard(self._channel) self.mouse = Mouse(self._channel) self.touchscreen = Touchscreen(self._channel) self._main_frame: Frame = from_channel(initializer["mainFrame"]) self._main_frame._page = self self._frames = [self._main_frame] self._viewport_size: Optional[ViewportSize] = initializer.get("viewportSize") self._is_closed = False self._workers: List["Worker"] = [] self._bindings: Dict[str, Any] = {} self._routes: List[RouteHandler] = [] self._web_socket_routes: List[WebSocketRouteHandler] = [] self._owned_context: Optional["BrowserContext"] = None self._timeout_settings: TimeoutSettings = TimeoutSettings( self._browser_context._timeout_settings ) self._video: Optional[Video] = None self._opener = cast("Page", from_nullable_channel(initializer.get("opener"))) self._close_reason: Optional[str] = None self._close_was_called = False self._har_routers: List[HarRouter] = [] self._locator_handlers: Dict[str, LocatorHandler] = {} self._channel.on( "bindingCall", lambda params: self._on_binding(from_channel(params["binding"])), ) self._channel.on("close", lambda _: self._on_close()) self._channel.on("crash", lambda _: self._on_crash()) self._channel.on("download", lambda params: self._on_download(params)) self._channel.on( "fileChooser", lambda params: self.emit( Page.Events.FileChooser, FileChooser( self, from_channel(params["element"]), params["isMultiple"] ), ), ) self._channel.on( "frameAttached", lambda params: self._on_frame_attached(from_channel(params["frame"])), ) self._channel.on( "frameDetached", lambda params: self._on_frame_detached(from_channel(params["frame"])), ) self._channel.on( "locatorHandlerTriggered", lambda params: self._loop.create_task( self._on_locator_handler_triggered(params["uid"]) ), ) self._channel.on( "route", lambda params: self._loop.create_task( self._on_route(from_channel(params["route"])) ), ) self._channel.on( "webSocketRoute", lambda params: self._loop.create_task( self._on_web_socket_route(from_channel(params["webSocketRoute"])) ), ) self._channel.on("video", lambda params: self._on_video(params)) self._channel.on( "webSocket", lambda params: self.emit( Page.Events.WebSocket, from_channel(params["webSocket"]) ), ) self._channel.on( "worker", lambda params: self._on_worker(from_channel(params["worker"])) ) self._closed_or_crashed_future: asyncio.Future = asyncio.Future() self.on( Page.Events.Close, lambda _: ( self._closed_or_crashed_future.set_result( self._close_error_with_reason() ) if not self._closed_or_crashed_future.done() else None ), ) self.on( Page.Events.Crash, lambda _: ( self._closed_or_crashed_future.set_result(TargetClosedError()) if not self._closed_or_crashed_future.done() else None ), ) self._set_event_to_subscription_mapping( { Page.Events.Console: "console", Page.Events.Dialog: "dialog", Page.Events.Request: "request", Page.Events.Response: "response", Page.Events.RequestFinished: "requestFinished", Page.Events.RequestFailed: "requestFailed", Page.Events.FileChooser: "fileChooser", } ) def __repr__(self) -> str: return f"" def _on_frame_attached(self, frame: Frame) -> None: frame._page = self self._frames.append(frame) self.emit(Page.Events.FrameAttached, frame) def _on_frame_detached(self, frame: Frame) -> None: self._frames.remove(frame) frame._detached = True self.emit(Page.Events.FrameDetached, frame) async def _on_route(self, route: Route) -> None: route._context = self.context route_handlers = self._routes.copy() for route_handler in route_handlers: # If the page was closed we stall all requests right away. if self._close_was_called or self.context._close_was_called: return if not route_handler.matches(route.request.url): continue if route_handler not in self._routes: continue if route_handler.will_expire: self._routes.remove(route_handler) try: handled = await route_handler.handle(route) finally: if len(self._routes) == 0: async def _update_interceptor_patterns_ignore_exceptions() -> None: try: await self._update_interception_patterns() except Error: pass asyncio.create_task( self._connection.wrap_api_call( _update_interceptor_patterns_ignore_exceptions, True ) ) if handled: return await self._browser_context._on_route(route) async def _on_web_socket_route(self, web_socket_route: WebSocketRoute) -> None: route_handler = next( ( route_handler for route_handler in self._web_socket_routes if route_handler.matches(web_socket_route.url) ), None, ) if route_handler: await route_handler.handle(web_socket_route) else: await self._browser_context._on_web_socket_route(web_socket_route) def _on_binding(self, binding_call: "BindingCall") -> None: func = self._bindings.get(binding_call._initializer["name"]) if func: asyncio.create_task(binding_call.call(func)) self._browser_context._on_binding(binding_call) def _on_worker(self, worker: "Worker") -> None: self._workers.append(worker) worker._page = self self.emit(Page.Events.Worker, worker) def _on_close(self) -> None: self._is_closed = True if self in self._browser_context._pages: self._browser_context._pages.remove(self) if self in self._browser_context._background_pages: self._browser_context._background_pages.remove(self) self._dispose_har_routers() self.emit(Page.Events.Close, self) def _on_crash(self) -> None: self.emit(Page.Events.Crash, self) def _on_download(self, params: Any) -> None: url = params["url"] suggested_filename = params["suggestedFilename"] artifact = cast(Artifact, from_channel(params["artifact"])) self.emit( Page.Events.Download, Download(self, url, suggested_filename, artifact) ) def _on_video(self, params: Any) -> None: artifact = from_channel(params["artifact"]) self._force_video()._artifact_ready(artifact) @property def context(self) -> "BrowserContext": return self._browser_context @property def clock(self) -> Clock: return self._browser_context.clock async def opener(self) -> Optional["Page"]: if self._opener and self._opener.is_closed(): return None return self._opener @property def main_frame(self) -> Frame: return self._main_frame def frame(self, name: str = None, url: URLMatch = None) -> Optional[Frame]: for frame in self._frames: if name and frame.name == name: return frame if url and url_matches( self._browser_context._options.get("baseURL"), frame.url, url ): return frame return None @property def frames(self) -> List[Frame]: return self._frames.copy() def set_default_navigation_timeout(self, timeout: float) -> None: self._timeout_settings.set_default_navigation_timeout(timeout) self._channel.send_no_reply( "setDefaultNavigationTimeoutNoReply", dict(timeout=timeout) ) def set_default_timeout(self, timeout: float) -> None: self._timeout_settings.set_default_timeout(timeout) self._channel.send_no_reply("setDefaultTimeoutNoReply", dict(timeout=timeout)) async def query_selector( self, selector: str, strict: bool = None, ) -> Optional[ElementHandle]: return await self._main_frame.query_selector(selector, strict) async def query_selector_all(self, selector: str) -> List[ElementHandle]: return await self._main_frame.query_selector_all(selector) async def wait_for_selector( self, selector: str, timeout: float = None, state: Literal["attached", "detached", "hidden", "visible"] = None, strict: bool = None, ) -> Optional[ElementHandle]: return await self._main_frame.wait_for_selector(**locals_to_params(locals())) async def is_checked( self, selector: str, strict: bool = None, timeout: float = None ) -> bool: return await self._main_frame.is_checked(**locals_to_params(locals())) async def is_disabled( self, selector: str, strict: bool = None, timeout: float = None ) -> bool: return await self._main_frame.is_disabled(**locals_to_params(locals())) async def is_editable( self, selector: str, strict: bool = None, timeout: float = None ) -> bool: return await self._main_frame.is_editable(**locals_to_params(locals())) async def is_enabled( self, selector: str, strict: bool = None, timeout: float = None ) -> bool: return await self._main_frame.is_enabled(**locals_to_params(locals())) async def is_hidden( self, selector: str, strict: bool = None, timeout: float = None ) -> bool: return await self._main_frame.is_hidden(**locals_to_params(locals())) async def is_visible( self, selector: str, strict: bool = None, timeout: float = None ) -> bool: return await self._main_frame.is_visible(**locals_to_params(locals())) async def dispatch_event( self, selector: str, type: str, eventInit: Dict = None, timeout: float = None, strict: bool = None, ) -> None: return await self._main_frame.dispatch_event(**locals_to_params(locals())) async def evaluate(self, expression: str, arg: Serializable = None) -> Any: return await self._main_frame.evaluate(expression, arg) async def evaluate_handle( self, expression: str, arg: Serializable = None ) -> JSHandle: return await self._main_frame.evaluate_handle(expression, arg) async def eval_on_selector( self, selector: str, expression: str, arg: Serializable = None, strict: bool = None, ) -> Any: return await self._main_frame.eval_on_selector( selector, expression, arg, strict ) async def eval_on_selector_all( self, selector: str, expression: str, arg: Serializable = None, ) -> Any: return await self._main_frame.eval_on_selector_all(selector, expression, arg) async def add_script_tag( self, url: str = None, path: Union[str, Path] = None, content: str = None, type: str = None, ) -> ElementHandle: return await self._main_frame.add_script_tag(**locals_to_params(locals())) async def add_style_tag( self, url: str = None, path: Union[str, Path] = None, content: str = None ) -> ElementHandle: return await self._main_frame.add_style_tag(**locals_to_params(locals())) async def expose_function(self, name: str, callback: Callable) -> None: await self.expose_binding(name, lambda source, *args: callback(*args)) async def expose_binding( self, name: str, callback: Callable, handle: bool = None ) -> None: if name in self._bindings: raise Error(f'Function "{name}" has been already registered') if name in self._browser_context._bindings: raise Error( f'Function "{name}" has been already registered in the browser context' ) self._bindings[name] = callback await self._channel.send( "exposeBinding", dict(name=name, needsHandle=handle or False) ) async def set_extra_http_headers(self, headers: Dict[str, str]) -> None: await self._channel.send( "setExtraHTTPHeaders", dict(headers=serialize_headers(headers)) ) @property def url(self) -> str: return self._main_frame.url async def content(self) -> str: return await self._main_frame.content() async def set_content( self, html: str, timeout: float = None, waitUntil: DocumentLoadState = None, ) -> None: return await self._main_frame.set_content(**locals_to_params(locals())) async def goto( self, url: str, timeout: float = None, waitUntil: DocumentLoadState = None, referer: str = None, ) -> Optional[Response]: return await self._main_frame.goto(**locals_to_params(locals())) async def reload( self, timeout: float = None, waitUntil: DocumentLoadState = None, ) -> Optional[Response]: return from_nullable_channel( await self._channel.send("reload", locals_to_params(locals())) ) async def wait_for_load_state( self, state: Literal["domcontentloaded", "load", "networkidle"] = None, timeout: float = None, ) -> None: return await self._main_frame.wait_for_load_state(**locals_to_params(locals())) async def wait_for_url( self, url: URLMatch, waitUntil: DocumentLoadState = None, timeout: float = None, ) -> None: return await self._main_frame.wait_for_url(**locals_to_params(locals())) async def wait_for_event( self, event: str, predicate: Callable = None, timeout: float = None ) -> Any: async with self.expect_event(event, predicate, timeout) as event_info: pass return await event_info async def go_back( self, timeout: float = None, waitUntil: DocumentLoadState = None, ) -> Optional[Response]: return from_nullable_channel( await self._channel.send("goBack", locals_to_params(locals())) ) async def go_forward( self, timeout: float = None, waitUntil: DocumentLoadState = None, ) -> Optional[Response]: return from_nullable_channel( await self._channel.send("goForward", locals_to_params(locals())) ) async def request_gc(self) -> None: await self._channel.send("requestGC") async def emulate_media( self, media: Literal["null", "print", "screen"] = None, colorScheme: ColorScheme = None, reducedMotion: ReducedMotion = None, forcedColors: ForcedColors = None, contrast: Contrast = None, ) -> None: params = locals_to_params(locals()) if "media" in params: params["media"] = "no-override" if params["media"] == "null" else media if "colorScheme" in params: params["colorScheme"] = ( "no-override" if params["colorScheme"] == "null" else colorScheme ) if "reducedMotion" in params: params["reducedMotion"] = ( "no-override" if params["reducedMotion"] == "null" else reducedMotion ) if "forcedColors" in params: params["forcedColors"] = ( "no-override" if params["forcedColors"] == "null" else forcedColors ) if "contrast" in params: params["contrast"] = ( "no-override" if params["contrast"] == "null" else contrast ) await self._channel.send("emulateMedia", params) async def set_viewport_size(self, viewportSize: ViewportSize) -> None: self._viewport_size = viewportSize await self._channel.send("setViewportSize", locals_to_params(locals())) @property def viewport_size(self) -> Optional[ViewportSize]: return self._viewport_size async def bring_to_front(self) -> None: await self._channel.send("bringToFront") async def add_init_script( self, script: str = None, path: Union[str, Path] = None ) -> None: if path: script = add_source_url_to_script( (await async_readfile(path)).decode(), path ) if not isinstance(script, str): raise Error("Either path or script parameter must be specified") await self._channel.send("addInitScript", dict(source=script)) async def route( self, url: URLMatch, handler: RouteHandlerCallback, times: int = None ) -> None: self._routes.insert( 0, RouteHandler( self._browser_context._options.get("baseURL"), url, handler, True if self._dispatcher_fiber else False, times, ), ) await self._update_interception_patterns() async def unroute( self, url: URLMatch, handler: Optional[RouteHandlerCallback] = None ) -> None: removed = [] remaining = [] for route in self._routes: if route.url != url or (handler and route.handler != handler): remaining.append(route) else: removed.append(route) await self._unroute_internal(removed, remaining, "default") async def _unroute_internal( self, removed: List[RouteHandler], remaining: List[RouteHandler], behavior: Literal["default", "ignoreErrors", "wait"] = None, ) -> None: self._routes = remaining await self._update_interception_patterns() if behavior is None or behavior == "default": return await asyncio.gather( *map( lambda route: route.stop(behavior), # type: ignore removed, ) ) async def route_web_socket( self, url: URLMatch, handler: WebSocketRouteHandlerCallback ) -> None: self._web_socket_routes.insert( 0, WebSocketRouteHandler( self._browser_context._options.get("baseURL"), url, handler ), ) await self._update_web_socket_interception_patterns() def _dispose_har_routers(self) -> None: for router in self._har_routers: router.dispose() self._har_routers = [] async def unroute_all( self, behavior: Literal["default", "ignoreErrors", "wait"] = None ) -> None: await self._unroute_internal(self._routes, [], behavior) self._dispose_har_routers() async def route_from_har( self, har: Union[Path, str], url: Union[Pattern[str], str] = None, notFound: RouteFromHarNotFoundPolicy = None, update: bool = None, updateContent: Literal["attach", "embed"] = None, updateMode: HarMode = None, ) -> None: if update: await self._browser_context._record_into_har( har=har, page=self, url=url, update_content=updateContent, update_mode=updateMode, ) return router = await HarRouter.create( local_utils=self._connection.local_utils, file=str(har), not_found_action=notFound or "abort", url_matcher=url, ) self._har_routers.append(router) await router.add_page_route(self) async def _update_interception_patterns(self) -> None: patterns = RouteHandler.prepare_interception_patterns(self._routes) await self._channel.send( "setNetworkInterceptionPatterns", {"patterns": patterns} ) async def _update_web_socket_interception_patterns(self) -> None: patterns = WebSocketRouteHandler.prepare_interception_patterns( self._web_socket_routes ) await self._channel.send( "setWebSocketInterceptionPatterns", {"patterns": patterns} ) async def screenshot( self, timeout: float = None, type: Literal["jpeg", "png"] = None, path: Union[str, Path] = None, quality: int = None, omitBackground: bool = None, fullPage: bool = None, clip: FloatRect = None, animations: Literal["allow", "disabled"] = None, caret: Literal["hide", "initial"] = None, scale: Literal["css", "device"] = None, mask: Sequence["Locator"] = None, maskColor: str = None, style: str = None, ) -> bytes: params = locals_to_params(locals()) if "path" in params: del params["path"] if "mask" in params: params["mask"] = list( map( lambda locator: ( { "frame": locator._frame._channel, "selector": locator._selector, } ), params["mask"], ) ) encoded_binary = await self._channel.send("screenshot", params) decoded_binary = base64.b64decode(encoded_binary) if path: make_dirs_for_file(path) await async_writefile(path, decoded_binary) return decoded_binary async def title(self) -> str: return await self._main_frame.title() async def close(self, runBeforeUnload: bool = None, reason: str = None) -> None: self._close_reason = reason self._close_was_called = True try: await self._channel.send("close", locals_to_params(locals())) if self._owned_context: await self._owned_context.close() except Exception as e: if not is_target_closed_error(e) and not runBeforeUnload: raise e def is_closed(self) -> bool: return self._is_closed async def click( self, selector: str, modifiers: Sequence[KeyboardModifier] = None, position: Position = None, delay: float = None, button: MouseButton = None, clickCount: int = None, timeout: float = None, force: bool = None, noWaitAfter: bool = None, trial: bool = None, strict: bool = None, ) -> None: return await self._main_frame.click(**locals_to_params(locals())) async def dblclick( self, selector: str, modifiers: Sequence[KeyboardModifier] = None, position: Position = None, delay: float = None, button: MouseButton = None, timeout: float = None, force: bool = None, noWaitAfter: bool = None, strict: bool = None, trial: bool = None, ) -> None: return await self._main_frame.dblclick(**locals_to_params(locals())) async def tap( self, selector: str, modifiers: Sequence[KeyboardModifier] = None, position: Position = None, timeout: float = None, force: bool = None, noWaitAfter: bool = None, strict: bool = None, trial: bool = None, ) -> None: return await self._main_frame.tap(**locals_to_params(locals())) async def fill( self, selector: str, value: str, timeout: float = None, noWaitAfter: bool = None, strict: bool = None, force: bool = None, ) -> None: return await self._main_frame.fill(**locals_to_params(locals())) def locator( self, selector: str, hasText: Union[str, Pattern[str]] = None, hasNotText: Union[str, Pattern[str]] = None, has: "Locator" = None, hasNot: "Locator" = None, ) -> "Locator": return self._main_frame.locator( selector, hasText=hasText, hasNotText=hasNotText, has=has, hasNot=hasNot, ) def get_by_alt_text( self, text: Union[str, Pattern[str]], exact: bool = None ) -> "Locator": return self._main_frame.get_by_alt_text(text, exact=exact) def get_by_label( self, text: Union[str, Pattern[str]], exact: bool = None ) -> "Locator": return self._main_frame.get_by_label(text, exact=exact) def get_by_placeholder( self, text: Union[str, Pattern[str]], exact: bool = None ) -> "Locator": return self._main_frame.get_by_placeholder(text, exact=exact) def get_by_role( self, role: AriaRole, checked: bool = None, disabled: bool = None, expanded: bool = None, includeHidden: bool = None, level: int = None, name: Union[str, Pattern[str]] = None, pressed: bool = None, selected: bool = None, exact: bool = None, ) -> "Locator": return self._main_frame.get_by_role( role, checked=checked, disabled=disabled, expanded=expanded, includeHidden=includeHidden, level=level, name=name, pressed=pressed, selected=selected, exact=exact, ) def get_by_test_id(self, testId: Union[str, Pattern[str]]) -> "Locator": return self._main_frame.get_by_test_id(testId) def get_by_text( self, text: Union[str, Pattern[str]], exact: bool = None ) -> "Locator": return self._main_frame.get_by_text(text, exact=exact) def get_by_title( self, text: Union[str, Pattern[str]], exact: bool = None ) -> "Locator": return self._main_frame.get_by_title(text, exact=exact) def frame_locator(self, selector: str) -> "FrameLocator": return self.main_frame.frame_locator(selector) async def focus( self, selector: str, strict: bool = None, timeout: float = None ) -> None: return await self._main_frame.focus(**locals_to_params(locals())) async def text_content( self, selector: str, strict: bool = None, timeout: float = None ) -> Optional[str]: return await self._main_frame.text_content(**locals_to_params(locals())) async def inner_text( self, selector: str, strict: bool = None, timeout: float = None ) -> str: return await self._main_frame.inner_text(**locals_to_params(locals())) async def inner_html( self, selector: str, strict: bool = None, timeout: float = None ) -> str: return await self._main_frame.inner_html(**locals_to_params(locals())) async def get_attribute( self, selector: str, name: str, strict: bool = None, timeout: float = None ) -> Optional[str]: return await self._main_frame.get_attribute(**locals_to_params(locals())) async def hover( self, selector: str, modifiers: Sequence[KeyboardModifier] = None, position: Position = None, timeout: float = None, noWaitAfter: bool = None, force: bool = None, strict: bool = None, trial: bool = None, ) -> None: return await self._main_frame.hover(**locals_to_params(locals())) async def drag_and_drop( self, source: str, target: str, sourcePosition: Position = None, targetPosition: Position = None, force: bool = None, noWaitAfter: bool = None, timeout: float = None, strict: bool = None, trial: bool = None, ) -> None: return await self._main_frame.drag_and_drop(**locals_to_params(locals())) async def select_option( self, selector: str, value: Union[str, Sequence[str]] = None, index: Union[int, Sequence[int]] = None, label: Union[str, Sequence[str]] = None, element: Union["ElementHandle", Sequence["ElementHandle"]] = None, timeout: float = None, noWaitAfter: bool = None, force: bool = None, strict: bool = None, ) -> List[str]: params = locals_to_params(locals()) return await self._main_frame.select_option(**params) async def input_value( self, selector: str, strict: bool = None, timeout: float = None ) -> str: params = locals_to_params(locals()) return await self._main_frame.input_value(**params) async def set_input_files( self, selector: str, files: Union[ str, Path, FilePayload, Sequence[Union[str, Path]], Sequence[FilePayload] ], timeout: float = None, strict: bool = None, noWaitAfter: bool = None, ) -> None: return await self._main_frame.set_input_files(**locals_to_params(locals())) async def type( self, selector: str, text: str, delay: float = None, timeout: float = None, noWaitAfter: bool = None, strict: bool = None, ) -> None: return await self._main_frame.type(**locals_to_params(locals())) async def press( self, selector: str, key: str, delay: float = None, timeout: float = None, noWaitAfter: bool = None, strict: bool = None, ) -> None: return await self._main_frame.press(**locals_to_params(locals())) async def check( self, selector: str, position: Position = None, timeout: float = None, force: bool = None, noWaitAfter: bool = None, strict: bool = None, trial: bool = None, ) -> None: return await self._main_frame.check(**locals_to_params(locals())) async def uncheck( self, selector: str, position: Position = None, timeout: float = None, force: bool = None, noWaitAfter: bool = None, strict: bool = None, trial: bool = None, ) -> None: return await self._main_frame.uncheck(**locals_to_params(locals())) async def wait_for_timeout(self, timeout: float) -> None: await self._main_frame.wait_for_timeout(timeout) async def wait_for_function( self, expression: str, arg: Serializable = None, timeout: float = None, polling: Union[float, Literal["raf"]] = None, ) -> JSHandle: return await self._main_frame.wait_for_function(**locals_to_params(locals())) @property def workers(self) -> List["Worker"]: return self._workers.copy() @property def request(self) -> "APIRequestContext": return self.context.request async def pause(self) -> None: default_navigation_timeout = ( self._browser_context._timeout_settings.default_navigation_timeout() ) default_timeout = self._browser_context._timeout_settings.default_timeout() self._browser_context.set_default_navigation_timeout(0) self._browser_context.set_default_timeout(0) try: await asyncio.wait( [ asyncio.create_task(self._browser_context._channel.send("pause")), self._closed_or_crashed_future, ], return_when=asyncio.FIRST_COMPLETED, ) finally: self._browser_context._set_default_navigation_timeout_impl( default_navigation_timeout ) self._browser_context._set_default_timeout_impl(default_timeout) async def pdf( self, scale: float = None, displayHeaderFooter: bool = None, headerTemplate: str = None, footerTemplate: str = None, printBackground: bool = None, landscape: bool = None, pageRanges: str = None, format: str = None, width: Union[str, float] = None, height: Union[str, float] = None, preferCSSPageSize: bool = None, margin: PdfMargins = None, path: Union[str, Path] = None, outline: bool = None, tagged: bool = None, ) -> bytes: params = locals_to_params(locals()) if "path" in params: del params["path"] encoded_binary = await self._channel.send("pdf", params) decoded_binary = base64.b64decode(encoded_binary) if path: make_dirs_for_file(path) await async_writefile(path, decoded_binary) return decoded_binary def _force_video(self) -> Video: if not self._video: self._video = Video(self) return self._video @property def video( self, ) -> Optional[Video]: # Note: we are creating Video object lazily, because we do not know # BrowserContextOptions when constructing the page - it is assigned # too late during launchPersistentContext. if not self._browser_context._options.get("recordVideo"): return None return self._force_video() def _close_error_with_reason(self) -> TargetClosedError: return TargetClosedError( self._close_reason or self._browser_context._effective_close_reason() ) def expect_event( self, event: str, predicate: Callable = None, timeout: float = None, ) -> EventContextManagerImpl: return self._expect_event( event, predicate, timeout, f'waiting for event "{event}"' ) def _expect_event( self, event: str, predicate: Callable = None, timeout: float = None, log_line: str = None, ) -> EventContextManagerImpl: if timeout is None: timeout = self._timeout_settings.timeout() waiter = Waiter(self, f"page.expect_event({event})") waiter.reject_on_timeout( timeout, f'Timeout {timeout}ms exceeded while waiting for event "{event}"' ) if log_line: waiter.log(log_line) if event != Page.Events.Crash: waiter.reject_on_event(self, Page.Events.Crash, Error("Page crashed")) if event != Page.Events.Close: waiter.reject_on_event( self, Page.Events.Close, lambda: self._close_error_with_reason() ) waiter.wait_for_event(self, event, predicate) return EventContextManagerImpl(waiter.result()) def expect_console_message( self, predicate: Callable[[ConsoleMessage], bool] = None, timeout: float = None, ) -> EventContextManagerImpl[ConsoleMessage]: return self.expect_event(Page.Events.Console, predicate, timeout) def expect_download( self, predicate: Callable[[Download], bool] = None, timeout: float = None, ) -> EventContextManagerImpl[Download]: return self.expect_event(Page.Events.Download, predicate, timeout) def expect_file_chooser( self, predicate: Callable[[FileChooser], bool] = None, timeout: float = None, ) -> EventContextManagerImpl[FileChooser]: return self.expect_event(Page.Events.FileChooser, predicate, timeout) def expect_navigation( self, url: URLMatch = None, waitUntil: DocumentLoadState = None, timeout: float = None, ) -> EventContextManagerImpl[Response]: return self.main_frame.expect_navigation(url, waitUntil, timeout) def expect_popup( self, predicate: Callable[["Page"], bool] = None, timeout: float = None, ) -> EventContextManagerImpl["Page"]: return self.expect_event(Page.Events.Popup, predicate, timeout) def expect_request( self, urlOrPredicate: URLMatchRequest, timeout: float = None, ) -> EventContextManagerImpl[Request]: def my_predicate(request: Request) -> bool: if not callable(urlOrPredicate): return url_matches( self._browser_context._options.get("baseURL"), request.url, urlOrPredicate, ) return urlOrPredicate(request) trimmed_url = trim_url(urlOrPredicate) log_line = f"waiting for request {trimmed_url}" if trimmed_url else None return self._expect_event( Page.Events.Request, predicate=my_predicate, timeout=timeout, log_line=log_line, ) def expect_request_finished( self, predicate: Callable[["Request"], bool] = None, timeout: float = None, ) -> EventContextManagerImpl[Request]: return self.expect_event( Page.Events.RequestFinished, predicate=predicate, timeout=timeout ) def expect_response( self, urlOrPredicate: URLMatchResponse, timeout: float = None, ) -> EventContextManagerImpl[Response]: def my_predicate(request: Response) -> bool: if not callable(urlOrPredicate): return url_matches( self._browser_context._options.get("baseURL"), request.url, urlOrPredicate, ) return urlOrPredicate(request) trimmed_url = trim_url(urlOrPredicate) log_line = f"waiting for response {trimmed_url}" if trimmed_url else None return self._expect_event( Page.Events.Response, predicate=my_predicate, timeout=timeout, log_line=log_line, ) def expect_websocket( self, predicate: Callable[["WebSocket"], bool] = None, timeout: float = None, ) -> EventContextManagerImpl["WebSocket"]: return self.expect_event("websocket", predicate, timeout) def expect_worker( self, predicate: Callable[["Worker"], bool] = None, timeout: float = None, ) -> EventContextManagerImpl["Worker"]: return self.expect_event("worker", predicate, timeout) async def set_checked( self, selector: str, checked: bool, position: Position = None, timeout: float = None, force: bool = None, noWaitAfter: bool = None, strict: bool = None, trial: bool = None, ) -> None: if checked: await self.check( selector=selector, position=position, timeout=timeout, force=force, strict=strict, trial=trial, ) else: await self.uncheck( selector=selector, position=position, timeout=timeout, force=force, strict=strict, trial=trial, ) async def add_locator_handler( self, locator: "Locator", handler: Union[Callable[["Locator"], Any], Callable[[], Any]], noWaitAfter: bool = None, times: int = None, ) -> None: if locator._frame != self._main_frame: raise Error("Locator must belong to the main frame of this page") if times == 0: return uid = await self._channel.send( "registerLocatorHandler", { "selector": locator._selector, "noWaitAfter": noWaitAfter, }, ) self._locator_handlers[uid] = LocatorHandler( handler=handler, times=times, locator=locator ) async def _on_locator_handler_triggered(self, uid: str) -> None: remove = False try: handler = self._locator_handlers.get(uid) if handler and handler.times != 0: if handler.times is not None: handler.times -= 1 if self._dispatcher_fiber: handler_finished_future = self._loop.create_future() def _handler() -> None: try: handler() handler_finished_future.set_result(None) except Exception as e: handler_finished_future.set_exception(e) g = LocatorHandlerGreenlet(_handler) g.switch() await handler_finished_future else: coro_or_future = handler() if coro_or_future: await coro_or_future remove = handler.times == 0 finally: if remove: del self._locator_handlers[uid] try: await self._connection.wrap_api_call( lambda: self._channel.send( "resolveLocatorHandlerNoReply", {"uid": uid, "remove": remove} ), is_internal=True, ) except Error: pass async def remove_locator_handler(self, locator: "Locator") -> None: for uid, data in self._locator_handlers.copy().items(): if data.locator._equals(locator): del self._locator_handlers[uid] self._channel.send_no_reply("unregisterLocatorHandler", {"uid": uid}) class Worker(ChannelOwner): Events = SimpleNamespace(Close="close") def __init__( self, parent: ChannelOwner, type: str, guid: str, initializer: Dict ) -> None: super().__init__(parent, type, guid, initializer) self._channel.on("close", lambda _: self._on_close()) self._page: Optional[Page] = None self._context: Optional["BrowserContext"] = None def __repr__(self) -> str: return f"" def _on_close(self) -> None: if self._page: self._page._workers.remove(self) if self._context: self._context._service_workers.remove(self) self.emit(Worker.Events.Close, self) @property def url(self) -> str: return self._initializer["url"] async def evaluate(self, expression: str, arg: Serializable = None) -> Any: return parse_result( await self._channel.send( "evaluateExpression", dict( expression=expression, arg=serialize_argument(arg), ), ) ) async def evaluate_handle( self, expression: str, arg: Serializable = None ) -> JSHandle: return from_channel( await self._channel.send( "evaluateExpressionHandle", dict( expression=expression, arg=serialize_argument(arg), ), ) ) class BindingCall(ChannelOwner): def __init__( self, parent: ChannelOwner, type: str, guid: str, initializer: Dict ) -> None: super().__init__(parent, type, guid, initializer) async def call(self, func: Callable) -> None: try: frame = from_channel(self._initializer["frame"]) source = dict(context=frame._page.context, page=frame._page, frame=frame) if self._initializer.get("handle"): result = func(source, from_channel(self._initializer["handle"])) else: func_args = list(map(parse_result, self._initializer["args"])) result = func(source, *func_args) if inspect.iscoroutine(result): result = await result await self._channel.send("resolve", dict(result=serialize_argument(result))) except Exception as e: tb = sys.exc_info()[2] asyncio.create_task( self._channel.send( "reject", dict(error=dict(error=serialize_error(e, tb))) ) ) def trim_url(param: Union[URLMatchRequest, URLMatchResponse]) -> Optional[str]: if isinstance(param, re.Pattern): return trim_end(param.pattern) if isinstance(param, str): return trim_end(param) return None def trim_end(s: str) -> str: if len(s) > 50: return s[:50] + "\u2026" return s