From a504b9377495899ac8f0bbda40d694ce2950b51c Mon Sep 17 00:00:00 2001 From: Lau Ching Jun Date: Wed, 2 Feb 2022 13:55:22 -0800 Subject: [PATCH] Addresses the feedbacks in #95738 (#97457) --- .../lib/src/commands/daemon.dart | 32 +++- packages/flutter_tools/lib/src/daemon.dart | 113 +++++++------ packages/flutter_tools/lib/src/device.dart | 43 ++--- .../commands.shard/hermetic/daemon_test.dart | 154 ++++++++++++++++++ 4 files changed, 260 insertions(+), 82 deletions(-) diff --git a/packages/flutter_tools/lib/src/commands/daemon.dart b/packages/flutter_tools/lib/src/commands/daemon.dart index 459b872b437..5efcbf59c77 100644 --- a/packages/flutter_tools/lib/src/commands/daemon.dart +++ b/packages/flutter_tools/lib/src/commands/daemon.dart @@ -1340,12 +1340,38 @@ class ProxyDomain extends Domain { Future connect(Map args) async { final int targetPort = _getIntArg(args, 'port', required: true); final String id = 'portForwarder_${targetPort}_${_id++}'; - final Socket socket = await Socket.connect('127.0.0.1', targetPort); + + Socket socket; + + try { + socket = await Socket.connect(InternetAddress.loopbackIPv4, targetPort); + } on SocketException { + globals.logger.printTrace('Connecting to localhost:$targetPort failed with IPv4'); + } + + try { + // If connecting to IPv4 loopback interface fails, try IPv6. + socket ??= await Socket.connect(InternetAddress.loopbackIPv6, targetPort); + } on SocketException { + globals.logger.printError('Connecting to localhost:$targetPort failed'); + } + + if (socket == null) { + throw Exception('Failed to connect to the port'); + } + _forwardedConnections[id] = socket; socket.listen((List data) { sendEvent('proxy.data.$id', null, data); + }, onError: (dynamic error, StackTrace stackTrace) { + // Socket error, probably disconnected. + globals.logger.printTrace('Socket error: $error, $stackTrace'); }); - unawaited(socket.done.then((dynamic _) { + + unawaited(socket.done.catchError((dynamic error, StackTrace stackTrace) { + // Socket error, probably disconnected. + globals.logger.printTrace('Socket error: $error, $stackTrace'); + }).then((dynamic _) { sendEvent('proxy.disconnected.$id'); })); return id; @@ -1376,7 +1402,7 @@ class ProxyDomain extends Domain { @override Future dispose() async { for (final Socket connection in _forwardedConnections.values) { - await connection.close(); + connection.destroy(); } await _tempDirectory?.delete(recursive: true); } diff --git a/packages/flutter_tools/lib/src/daemon.dart b/packages/flutter_tools/lib/src/daemon.dart index 4a3fe3ab837..1102c2b7270 100644 --- a/packages/flutter_tools/lib/src/daemon.dart +++ b/packages/flutter_tools/lib/src/daemon.dart @@ -98,39 +98,11 @@ class DaemonInputStreamConverter { // Processes a single chunk received in the input stream. void _processChunk(List chunk) { - const int LF = 10; // The '\n' character int start = 0; while (start < chunk.length) { if (state == _InputStreamParseState.json) { - // Search for newline character. - final int indexOfNewLine = chunk.indexOf(LF, start); - if (indexOfNewLine < 0) { - bytesBuilder.add(chunk.sublist(start)); - start = chunk.length; - } else { - bytesBuilder.add(chunk.sublist(start, indexOfNewLine + 1)); - start = indexOfNewLine + 1; - - // Process chunk here - final Uint8List combinedChunk = bytesBuilder.takeBytes(); - String jsonString = utf8.decode(combinedChunk).trim(); - if (jsonString.startsWith('[{') && jsonString.endsWith('}]')) { - jsonString = jsonString.substring(1, jsonString.length - 1); - final Map? value = castStringKeyedMap(json.decode(jsonString)); - if (value != null) { - // Check if we need to consume another binary blob. - if (value[_binaryLengthKey] != null) { - remainingBinaryLength = value[_binaryLengthKey]! as int; - currentBinaryStream = StreamController>(); - state = _InputStreamParseState.binary; - _controller.add(DaemonMessage(value, currentBinaryStream.stream)); - } else { - _controller.add(DaemonMessage(value)); - } - } - } - } + start += _processChunkInJsonMode(chunk, start); } else if (state == _InputStreamParseState.binary) { final int bytesSent = _addBinaryChunk(chunk, start, remainingBinaryLength); start += bytesSent; @@ -146,6 +118,41 @@ class DaemonInputStreamConverter { } } + /// Processes a chunk in JSON mode, and returns the number of bytes processed. + int _processChunkInJsonMode(List chunk, int start) { + const int LF = 10; // The '\n' character + + // Search for newline character. + final int indexOfNewLine = chunk.indexOf(LF, start); + if (indexOfNewLine < 0) { + bytesBuilder.add(chunk.sublist(start)); + return chunk.length - start; + } + + bytesBuilder.add(chunk.sublist(start, indexOfNewLine + 1)); + + // Process chunk here + final Uint8List combinedChunk = bytesBuilder.takeBytes(); + String jsonString = utf8.decode(combinedChunk).trim(); + if (jsonString.startsWith('[{') && jsonString.endsWith('}]')) { + jsonString = jsonString.substring(1, jsonString.length - 1); + final Map? value = castStringKeyedMap(json.decode(jsonString)); + if (value != null) { + // Check if we need to consume another binary blob. + if (value[_binaryLengthKey] != null) { + remainingBinaryLength = value[_binaryLengthKey]! as int; + currentBinaryStream = StreamController>(); + state = _InputStreamParseState.binary; + _controller.add(DaemonMessage(value, currentBinaryStream.stream)); + } else { + _controller.add(DaemonMessage(value)); + } + } + } + + return indexOfNewLine + 1 - start; + } + int _addBinaryChunk(List chunk, int start, int maximumSizeToRead) { if (start == 0 && chunk.length <= remainingBinaryLength) { currentBinaryStream.add(chunk); @@ -170,6 +177,32 @@ class DaemonStreams { inputStream = DaemonInputStreamConverter(rawInputStream).convertedStream, _logger = logger; + /// Creates a [DaemonStreams] that uses stdin and stdout as the underlying streams. + DaemonStreams.fromStdio(Stdio stdio, { required Logger logger }) + : this(stdio.stdin, stdio.stdout, logger: logger); + + /// Creates a [DaemonStreams] that uses [Socket] as the underlying streams. + DaemonStreams.fromSocket(Socket socket, { required Logger logger }) + : this(socket, socket, logger: logger); + + /// Connects to a server and creates a [DaemonStreams] from the connection as the underlying streams. + factory DaemonStreams.connect(String host, int port, { required Logger logger }) { + final Future socketFuture = Socket.connect(host, port); + final StreamController> inputStreamController = StreamController>(); + final StreamController> outputStreamController = StreamController>(); + socketFuture.then((Socket socket) { + inputStreamController.addStream(socket); + socket.addStream(outputStreamController.stream); + }).onError((Object error, StackTrace stackTrace) { + logger.printError('Socket error: $error'); + logger.printTrace('$stackTrace'); + // Propagate the error to the streams. + inputStreamController.addError(error, stackTrace); + unawaited(outputStreamController.close()); + }); + return DaemonStreams(inputStreamController.stream, outputStreamController.sink, logger: logger); + } + final StreamSink> _outputSink; final Logger _logger; @@ -197,28 +230,6 @@ class DaemonStreams { Future dispose() async { unawaited(_outputSink.close()); } - - /// Creates a [DaemonStreams] that uses stdin and stdout as the underlying streams. - static DaemonStreams fromStdio(Stdio stdio, { required Logger logger }) { - return DaemonStreams(stdio.stdin, stdio.stdout, logger: logger); - } - - /// Creates a [DaemonStreams] that uses [Socket] as the underlying streams. - static DaemonStreams fromSocket(Socket socket, { required Logger logger }) { - return DaemonStreams(socket, socket, logger: logger); - } - - /// Connects to a server and creates a [DaemonStreams] from the connection as the underlying streams. - static DaemonStreams connect(String host, int port, { required Logger logger }) { - final Future socketFuture = Socket.connect(host, port); - final StreamController> inputStreamController = StreamController>(); - final StreamController> outputStreamController = StreamController>(); - socketFuture.then((Socket socket) { - inputStreamController.addStream(socket); - socket.addStream(outputStreamController.stream); - }); - return DaemonStreams(inputStreamController.stream, outputStreamController.sink, logger: logger); - } } /// Connection between a flutter daemon and a client. diff --git a/packages/flutter_tools/lib/src/device.dart b/packages/flutter_tools/lib/src/device.dart index 97140e69a72..6b0800b47de 100644 --- a/packages/flutter_tools/lib/src/device.dart +++ b/packages/flutter_tools/lib/src/device.dart @@ -39,15 +39,11 @@ class Category { String toString() => value; static Category? fromString(String category) { - switch (category) { - case 'web': - return web; - case 'desktop': - return desktop; - case 'mobile': - return mobile; - } - return null; + return { + 'web': web, + 'desktop': desktop, + 'mobile': mobile, + }[category]; } } @@ -70,25 +66,16 @@ class PlatformType { String toString() => value; static PlatformType? fromString(String platformType) { - switch (platformType) { - case 'web': - return web; - case 'android': - return android; - case 'ios': - return ios; - case 'linux': - return linux; - case 'macos': - return macos; - case 'windows': - return windows; - case 'fuchsia': - return fuchsia; - case 'custom': - return custom; - } - return null; + return { + 'web': web, + 'android': android, + 'ios': ios, + 'linux': linux, + 'macos': macos, + 'windows': windows, + 'fuchsia': fuchsia, + 'custom': custom, + }[platformType]; } } diff --git a/packages/flutter_tools/test/commands.shard/hermetic/daemon_test.dart b/packages/flutter_tools/test/commands.shard/hermetic/daemon_test.dart index 13983883105..9e4c345c592 100644 --- a/packages/flutter_tools/test/commands.shard/hermetic/daemon_test.dart +++ b/packages/flutter_tools/test/commands.shard/hermetic/daemon_test.dart @@ -11,6 +11,8 @@ @Tags(['no-shuffle']) import 'dart:async'; +import 'dart:io' as io; +import 'dart:typed_data'; import 'package:fake_async/fake_async.dart'; import 'package:file/src/interface/file.dart'; @@ -564,6 +566,115 @@ void main() { }, overrides: { DevtoolsLauncher: () => FakeDevtoolsLauncher(null), }); + + testUsingContext('proxy.connect tries to connect to an ipv4 address and proxies the connection correctly', () async { + final TestIOOverrides ioOverrides = TestIOOverrides(); + await io.IOOverrides.runWithIOOverrides(() async { + final FakeSocket socket = FakeSocket(); + bool connectCalled = false; + int connectPort; + ioOverrides.connectCallback = (dynamic host, int port) async { + connectCalled = true; + connectPort = port; + if (host == io.InternetAddress.loopbackIPv4) { + return socket; + } + throw const io.SocketException('fail'); + }; + + daemon = Daemon( + daemonConnection, + notifyingLogger: notifyingLogger, + ); + daemonStreams.inputs.add(DaemonMessage({'id': 0, 'method': 'proxy.connect', 'params': {'port': 123}})); + + final Stream broadcastOutput = daemonStreams.outputs.stream.asBroadcastStream(); + final DaemonMessage firstResponse = await broadcastOutput.firstWhere(_notEvent); + expect(firstResponse.data['id'], 0); + expect(firstResponse.data['result'], isNotNull); + expect(connectCalled, true); + expect(connectPort, 123); + + final Object id = firstResponse.data['result']; + + // Can send received data as event. + socket.controller.add(Uint8List.fromList([10, 11, 12])); + final DaemonMessage dataEvent = await broadcastOutput.firstWhere( + (DaemonMessage message) => message.data['event'] != null && message.data['event'] == 'proxy.data.$id', + ); + expect(dataEvent.binary, isNotNull); + final List> data = await dataEvent.binary.toList(); + expect(data[0], [10, 11, 12]); + + // Can proxy data to the socket. + daemonStreams.inputs.add(DaemonMessage({'id': 0, 'method': 'proxy.write', 'params': {'id': id}}, Stream>.value([21, 22, 23]))); + await pumpEventQueue(); + expect(socket.addedData[0], [21, 22, 23]); + + // Closes the connection when disconnect request received. + expect(socket.closeCalled, false); + daemonStreams.inputs.add(DaemonMessage({'id': 0, 'method': 'proxy.disconnect', 'params': {'id': id}})); + await pumpEventQueue(); + expect(socket.closeCalled, true); + + // Sends disconnected event when socket.done completer finishes. + socket.doneCompleter.complete(); + final DaemonMessage disconnectEvent = await broadcastOutput.firstWhere( + (DaemonMessage message) => message.data['event'] != null && message.data['event'] == 'proxy.disconnected.$id', + ); + expect(disconnectEvent.data, isNotNull); + }, ioOverrides); + }); + + testUsingContext('proxy.connect connects to ipv6 if ipv4 failed', () async { + final TestIOOverrides ioOverrides = TestIOOverrides(); + await io.IOOverrides.runWithIOOverrides(() async { + final FakeSocket socket = FakeSocket(); + bool connectIpv4Called = false; + int connectPort; + ioOverrides.connectCallback = (dynamic host, int port) async { + connectPort = port; + if (host == io.InternetAddress.loopbackIPv4) { + connectIpv4Called = true; + } else if (host == io.InternetAddress.loopbackIPv6) { + return socket; + } + throw const io.SocketException('fail'); + }; + + daemon = Daemon( + daemonConnection, + notifyingLogger: notifyingLogger, + ); + daemonStreams.inputs.add(DaemonMessage({'id': 0, 'method': 'proxy.connect', 'params': {'port': 123}})); + + final Stream broadcastOutput = daemonStreams.outputs.stream.asBroadcastStream(); + final DaemonMessage firstResponse = await broadcastOutput.firstWhere(_notEvent); + expect(firstResponse.data['id'], 0); + expect(firstResponse.data['result'], isNotNull); + expect(connectIpv4Called, true); + expect(connectPort, 123); + }, ioOverrides); + }); + + testUsingContext('proxy.connect fails if both ipv6 and ipv4 failed', () async { + final TestIOOverrides ioOverrides = TestIOOverrides(); + await io.IOOverrides.runWithIOOverrides(() async { + ioOverrides.connectCallback = (dynamic host, int port) => throw const io.SocketException('fail'); + + daemon = Daemon( + daemonConnection, + notifyingLogger: notifyingLogger, + ); + daemonStreams.inputs.add(DaemonMessage({'id': 0, 'method': 'proxy.connect', 'params': {'port': 123}})); + + final Stream broadcastOutput = daemonStreams.outputs.stream.asBroadcastStream(); + final DaemonMessage firstResponse = await broadcastOutput.firstWhere(_notEvent); + expect(firstResponse.data['id'], 0); + expect(firstResponse.data['result'], isNull); + expect(firstResponse.data['error'], isNotNull); + }, ioOverrides); + }); }); testUsingContext('notifyingLogger outputs trace messages in verbose mode', () async { @@ -851,3 +962,46 @@ class FakeApplicationPackageFactory implements ApplicationPackageFactory { } class FakeApplicationPackage extends Fake implements ApplicationPackage {} + +class TestIOOverrides extends io.IOOverrides { + Future Function(dynamic host, int port) connectCallback; + + @override + Future socketConnect(dynamic host, int port, + {dynamic sourceAddress, int sourcePort = 0, Duration timeout}) { + return connectCallback(host, port); + } +} + +class FakeSocket extends Fake implements io.Socket { + bool closeCalled = false; + final StreamController controller = StreamController(); + final List> addedData = >[]; + final Completer doneCompleter = Completer(); + + @override + StreamSubscription listen( + void Function(Uint8List event) onData, { + Function onError, + void Function() onDone, + bool cancelOnError, + }) { + return controller.stream.listen(onData, onError: onError, onDone: onDone, cancelOnError: cancelOnError); + } + + @override + void add(List data) { + addedData.add(data); + } + + @override + Future close() async { + closeCalled = true; + } + + @override + Future get done => doneCompleter.future; + + @override + void destroy() {} +}