| # This module should be imported from REPL, not run from command line. |
| import socket |
| import uos |
| import network |
| import uwebsocket |
| import websocket_helper |
| import _webrepl |
| |
| listen_s = None |
| client_s = None |
| |
| def setup_conn(port, accept_handler): |
| global listen_s |
| listen_s = socket.socket() |
| listen_s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| |
| ai = socket.getaddrinfo("0.0.0.0", port) |
| addr = ai[0][4] |
| |
| listen_s.bind(addr) |
| listen_s.listen(1) |
| if accept_handler: |
| listen_s.setsockopt(socket.SOL_SOCKET, 20, accept_handler) |
| for i in (network.AP_IF, network.STA_IF): |
| iface = network.WLAN(i) |
| if iface.active(): |
| print("WebREPL daemon started on ws://%s:%d" % (iface.ifconfig()[0], port)) |
| return listen_s |
| |
| |
| def accept_conn(listen_sock): |
| global client_s |
| cl, remote_addr = listen_sock.accept() |
| prev = uos.dupterm(None) |
| uos.dupterm(prev) |
| if prev: |
| print("\nConcurrent WebREPL connection from", remote_addr, "rejected") |
| cl.close() |
| return |
| print("\nWebREPL connection from:", remote_addr) |
| client_s = cl |
| websocket_helper.server_handshake(cl) |
| ws = uwebsocket.websocket(cl, True) |
| ws = _webrepl._webrepl(ws) |
| cl.setblocking(False) |
| # notify REPL on socket incoming data (ESP32/ESP8266-only) |
| if hasattr(uos, 'dupterm_notify'): |
| cl.setsockopt(socket.SOL_SOCKET, 20, uos.dupterm_notify) |
| uos.dupterm(ws) |
| |
| |
| def stop(): |
| global listen_s, client_s |
| uos.dupterm(None) |
| if client_s: |
| client_s.close() |
| if listen_s: |
| listen_s.close() |
| |
| |
| def start(port=8266, password=None): |
| stop() |
| if password is None: |
| try: |
| import webrepl_cfg |
| _webrepl.password(webrepl_cfg.PASS) |
| setup_conn(port, accept_conn) |
| print("Started webrepl in normal mode") |
| except: |
| print("WebREPL is not configured, run 'import webrepl_setup'") |
| else: |
| _webrepl.password(password) |
| setup_conn(port, accept_conn) |
| print("Started webrepl in manual override mode") |
| |
| |
| def start_foreground(port=8266): |
| stop() |
| s = setup_conn(port, None) |
| accept_conn(s) |