diff --git a/packages/flutter_tools/lib/src/commands/attach.dart b/packages/flutter_tools/lib/src/commands/attach.dart index 39d98c2a835..f93d73c6a10 100644 --- a/packages/flutter_tools/lib/src/commands/attach.dart +++ b/packages/flutter_tools/lib/src/commands/attach.dart @@ -18,6 +18,7 @@ import '../base/io.dart'; import '../build_info.dart'; import '../commands/daemon.dart'; import '../compile.dart'; +import '../daemon.dart'; import '../device.dart'; import '../device_port_forwarder.dart'; import '../fuchsia/fuchsia_device.dart'; @@ -235,8 +236,10 @@ known, it can be explicitly provided to attach via the command-line, e.g. final Daemon daemon = boolArg('machine') ? Daemon( - stdinCommandStream, - stdoutCommandResponse, + DaemonConnection( + daemonStreams: StdioDaemonStreams(globals.stdio), + logger: globals.logger, + ), notifyingLogger: (globals.logger is NotifyingLogger) ? globals.logger as NotifyingLogger : NotifyingLogger(verbose: globals.logger.isVerbose, parent: globals.logger), diff --git a/packages/flutter_tools/lib/src/commands/daemon.dart b/packages/flutter_tools/lib/src/commands/daemon.dart index 3a10b6a8b37..cfaa273e9af 100644 --- a/packages/flutter_tools/lib/src/commands/daemon.dart +++ b/packages/flutter_tools/lib/src/commands/daemon.dart @@ -18,7 +18,7 @@ import '../base/logger.dart'; import '../base/terminal.dart'; import '../base/utils.dart'; import '../build_info.dart'; -import '../convert.dart'; +import '../daemon.dart'; import '../device.dart'; import '../device_port_forwarder.dart'; import '../emulator.dart'; @@ -41,7 +41,13 @@ const String protocolVersion = '0.6.1'; /// It can be shutdown with a `daemon.shutdown` command (or by killing the /// process). class DaemonCommand extends FlutterCommand { - DaemonCommand({ this.hidden = false }); + DaemonCommand({ this.hidden = false }) { + argParser.addOption( + 'listen-on-tcp-port', + help: 'If specified, the daemon will be listening for commands on the specified port instead of stdio.', + valueHelp: 'port', + ); + } @override final String name = 'daemon'; @@ -57,9 +63,31 @@ class DaemonCommand extends FlutterCommand { @override Future runCommand() async { + if (argResults['listen-on-tcp-port'] != null) { + int port; + try { + port = int.parse(stringArg('listen-on-tcp-port')); + } on FormatException catch (error) { + throwToolExit('Invalid port for `--listen-on-tcp-port`: $error'); + } + + await _DaemonServer( + port: port, + logger: StdoutLogger( + terminal: globals.terminal, + stdio: globals.stdio, + outputPreferences: globals.outputPreferences, + ), + notifyingLogger: asLogger(globals.logger), + ).run(); + return FlutterCommandResult.success(); + } globals.printStatus('Starting device daemon...'); final Daemon daemon = Daemon( - stdinCommandStream, stdoutCommandResponse, + DaemonConnection( + daemonStreams: StdioDaemonStreams(globals.stdio), + logger: globals.logger, + ), notifyingLogger: asLogger(globals.logger), ); final int code = await daemon.onExit; @@ -70,14 +98,57 @@ class DaemonCommand extends FlutterCommand { } } -typedef DispatchCommand = void Function(Map command); +class _DaemonServer { + _DaemonServer({ + this.port, + this.logger, + this.notifyingLogger, + }); + + final int port; + + /// Stdout logger used to print general server-related errors. + final Logger logger; + + // Logger that sends the message to the other end of daemon connection. + final NotifyingLogger notifyingLogger; + + Future run() async { + final ServerSocket serverSocket = await ServerSocket.bind(InternetAddress.loopbackIPv4, port); + logger.printStatus('Daemon server listening on ${serverSocket.port}'); + + final StreamSubscription subscription = serverSocket.listen( + (Socket socket) async { + // We have to listen to socket.done. Otherwise when the connection is + // reset, we will receive an uncatchable exception. + // https://github.com/dart-lang/sdk/issues/25518 + final Future socketDone = socket.done.catchError((dynamic error, StackTrace stackTrace) { + logger.printError('Socket error: $error'); + logger.printTrace('$stackTrace'); + }); + final Daemon daemon = Daemon( + DaemonConnection( + daemonStreams: TcpDaemonStreams(socket, logger: logger), + logger: logger, + ), + notifyingLogger: notifyingLogger, + ); + await daemon.onExit; + await socketDone; + }, + ); + + // Wait indefinitely until the server closes. + await subscription.asFuture(); + await subscription.cancel(); + } +} typedef CommandHandler = Future Function(Map args); class Daemon { Daemon( - Stream> commandStream, - this.sendCommand, { + this.connection, { this.notifyingLogger, this.logToStdout = false, }) { @@ -89,9 +160,10 @@ class Daemon { _registerDomain(devToolsDomain = DevToolsDomain(this)); // Start listening. - _commandSubscription = commandStream.listen( + _commandSubscription = connection.incomingCommands.listen( _handleRequest, onDone: () { + shutdown(); if (!_onExitCompleter.isCompleted) { _onExitCompleter.complete(0); } @@ -99,16 +171,15 @@ class Daemon { ); } + final DaemonConnection connection; + DaemonDomain daemonDomain; AppDomain appDomain; DeviceDomain deviceDomain; EmulatorDomain emulatorDomain; DevToolsDomain devToolsDomain; StreamSubscription> _commandSubscription; - int _outgoingRequestId = 1; - final Map> _outgoingRequestCompleters = >{}; - final DispatchCommand sendCommand; final NotifyingLogger notifyingLogger; final bool logToStdout; @@ -134,62 +205,27 @@ class Daemon { try { final String method = request['method'] as String; - if (method != null) { - if (!method.contains('.')) { - throw 'method not understood: $method'; - } - - final String prefix = method.substring(0, method.indexOf('.')); - final String name = method.substring(method.indexOf('.') + 1); - if (_domainMap[prefix] == null) { - throw 'no domain for method: $method'; - } - - _domainMap[prefix].handleCommand(name, id, castStringKeyedMap(request['params']) ?? const {}); - } else { - // If there was no 'method' field then it's a response to a daemon-to-editor request. - final Completer completer = _outgoingRequestCompleters[id.toString()]; - if (completer == null) { - throw 'unexpected response with id: $id'; - } - _outgoingRequestCompleters.remove(id.toString()); - - if (request['error'] != null) { - completer.completeError(request['error']); - } else { - completer.complete(request['result']); - } + assert(method != null); + if (!method.contains('.')) { + throw 'method not understood: $method'; } + + final String prefix = method.substring(0, method.indexOf('.')); + final String name = method.substring(method.indexOf('.') + 1); + if (_domainMap[prefix] == null) { + throw 'no domain for method: $method'; + } + + _domainMap[prefix].handleCommand(name, id, castStringKeyedMap(request['params']) ?? const {}); } on Exception catch (error, trace) { - _send({ - 'id': id, - 'error': _toJsonable(error), - 'trace': '$trace', - }); + connection.sendErrorResponse(id, _toJsonable(error), trace); } } - Future sendRequest(String method, [ dynamic args ]) { - final Map map = {'method': method}; - if (args != null) { - map['params'] = _toJsonable(args); - } - - final int id = _outgoingRequestId++; - final Completer completer = Completer(); - - map['id'] = id.toString(); - _outgoingRequestCompleters[id.toString()] = completer; - - _send(map); - return completer.future; - } - - void _send(Map map) => sendCommand(map); - Future shutdown({ dynamic error }) async { await devToolsDomain?.dispose(); await _commandSubscription?.cancel(); + await connection.dispose(); for (final Domain domain in _domainMap.values) { await domain.dispose(); } @@ -225,30 +261,16 @@ abstract class Domain { } throw 'command not understood: $name.$command'; }).then((dynamic result) { - if (result == null) { - _send({'id': id}); - } else { - _send({'id': id, 'result': _toJsonable(result)}); - } - }).catchError((dynamic error, dynamic trace) { - _send({ - 'id': id, - 'error': _toJsonable(error), - 'trace': '$trace', - }); + daemon.connection.sendResponse(id, _toJsonable(result)); + }).catchError((Object error, StackTrace stackTrace) { + daemon.connection.sendErrorResponse(id, _toJsonable(error), stackTrace); }); } void sendEvent(String name, [ dynamic args ]) { - final Map map = {'event': name}; - if (args != null) { - map['params'] = _toJsonable(args); - } - _send(map); + daemon.connection.sendEvent(name, _toJsonable(args)); } - void _send(Map map) => daemon._send(map); - String _getStringArg(Map args, String name, { bool required = false }) { if (required && !args.containsKey(name)) { throw '$name is required'; @@ -346,7 +368,7 @@ class DaemonDomain extends Domain { /// --web-allow-expose-url switch. The client may return the same URL back if /// tunnelling is not required for a given URL. Future exposeUrl(String url) async { - final dynamic res = await daemon.sendRequest('app.exposeUrl', {'url': url}); + final dynamic res = await daemon.connection.sendRequest('app.exposeUrl', {'url': url}); if (res is Map && res['url'] is String) { return res['url'] as String; } else { @@ -907,35 +929,6 @@ class DevToolsDomain extends Domain { } } -Stream> get stdinCommandStream => globals.stdio.stdin - .transform(utf8.decoder) - .transform(const LineSplitter()) - .where((String line) => line.startsWith('[{') && line.endsWith('}]')) - .map>((String line) { - line = line.substring(1, line.length - 1); - return castStringKeyedMap(json.decode(line)); - }); - -void stdoutCommandResponse(Map command) { - globals.stdio.stdoutWrite( - '[${jsonEncodeObject(command)}]\n', - fallback: (String message, dynamic error, StackTrace stack) { - throwToolExit('Failed to write daemon command response to stdout: $error'); - }, - ); -} - -String jsonEncodeObject(dynamic object) { - return json.encode(object, toEncodable: _toEncodable); -} - -dynamic _toEncodable(dynamic object) { - if (object is OperationResult) { - return _operationResultToMap(object); - } - return object; -} - Future> _deviceToMap(Device device) async { return { 'id': device.id, @@ -970,7 +963,7 @@ dynamic _toJsonable(dynamic obj) { return obj; } if (obj is OperationResult) { - return obj; + return _operationResultToMap(obj); } if (obj is ToolExit) { return obj.message; diff --git a/packages/flutter_tools/lib/src/commands/run.dart b/packages/flutter_tools/lib/src/commands/run.dart index 01d34cea6ab..2bf805017b9 100644 --- a/packages/flutter_tools/lib/src/commands/run.dart +++ b/packages/flutter_tools/lib/src/commands/run.dart @@ -14,6 +14,7 @@ import '../base/common.dart'; import '../base/file_system.dart'; import '../base/utils.dart'; import '../build_info.dart'; +import '../daemon.dart'; import '../device.dart'; import '../features.dart'; import '../globals.dart' as globals; @@ -556,8 +557,10 @@ class RunCommand extends RunCommandBase { throwToolExit('"--machine" does not support "-d all".'); } final Daemon daemon = Daemon( - stdinCommandStream, - stdoutCommandResponse, + DaemonConnection( + daemonStreams: StdioDaemonStreams(globals.stdio), + logger: globals.logger, + ), notifyingLogger: (globals.logger is NotifyingLogger) ? globals.logger as NotifyingLogger : NotifyingLogger(verbose: globals.logger.isVerbose, parent: globals.logger), diff --git a/packages/flutter_tools/lib/src/daemon.dart b/packages/flutter_tools/lib/src/daemon.dart new file mode 100644 index 00000000000..d5be7380a93 --- /dev/null +++ b/packages/flutter_tools/lib/src/daemon.dart @@ -0,0 +1,241 @@ +// Copyright 2014 The Flutter Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +import 'dart:async'; + +import 'base/common.dart'; +import 'base/io.dart'; +import 'base/logger.dart'; +import 'base/utils.dart'; +import 'convert.dart'; + +/// Parse binary streams in the JSON RPC format understood by the daemon, and +/// convert it into a stream of JSON RPC messages. +Stream> _convertInputStream(Stream> inputStream) { + return utf8.decoder.bind(inputStream) + .transform(const LineSplitter()) + .where((String line) => line.startsWith('[{') && line.endsWith('}]')) + .map?>((String line) { + line = line.substring(1, line.length - 1); + return castStringKeyedMap(json.decode(line)); + }) + .where((Map? entry) => entry != null) + .cast>(); +} + +/// A stream that a [DaemonConnection] uses to communicate with each other. +abstract class DaemonStreams { + /// Stream that contains input to the [DaemonConnection]. + Stream> get inputStream; + + /// Outputs a message through the connection. + void send(Map message); + + /// Cleans up any resources used. + Future dispose() async { } +} + +/// A [DaemonStream] that uses stdin and stdout as the underlying streams. +class StdioDaemonStreams extends DaemonStreams { + StdioDaemonStreams(Stdio stdio) : + _stdio = stdio, + inputStream = _convertInputStream(stdio.stdin); + + final Stdio _stdio; + + @override + final Stream> inputStream; + + @override + void send(Map message) { + _stdio.stdoutWrite( + '[${json.encode(message)}]\n', + fallback: (String message, Object? error, StackTrace stack) { + throwToolExit('Failed to write daemon command response to stdout: $error'); + }, + ); + } +} + +/// A [DaemonStream] that uses [Socket] as the underlying stream. +class TcpDaemonStreams extends DaemonStreams { + /// Creates a [DaemonStreams] with an existing [Socket]. + TcpDaemonStreams( + Socket socket, { + required Logger logger, + }): _logger = logger { + _socket = Future.value(_initializeSocket(socket)); + } + + /// Connects to a remote host and creates a [DaemonStreams] from the socket. + TcpDaemonStreams.connect( + String host, + int port, { + required Logger logger, + }) : _logger = logger { + _socket = Socket.connect(host, port).then(_initializeSocket); + } + + late final Future _socket; + final StreamController> _commands = StreamController>(); + final Logger _logger; + + @override + Stream> get inputStream => _commands.stream; + + @override + void send(Map message) { + _socket.then((Socket socket) { + try { + socket.write('[${json.encode(message)}]\n'); + } on SocketException catch (error) { + _logger.printError('Failed to write daemon command response to socket: $error'); + // Failed to send, close the connection + socket.close(); + } + }); + } + + Socket _initializeSocket(Socket socket) { + _commands.addStream(_convertInputStream(socket)); + return socket; + } + + @override + Future dispose() async { + await (await _socket).close(); + } +} + +/// Connection between a flutter daemon and a client. +class DaemonConnection { + DaemonConnection({ + required DaemonStreams daemonStreams, + required Logger logger, + }): _logger = logger, + _daemonStreams = daemonStreams { + _commandSubscription = daemonStreams.inputStream.listen( + _handleData, + onError: (Object error, StackTrace stackTrace) { + // We have to listen for on error otherwise the error on the socket + // will end up in the Zone error handler. + // Do nothing here and let the stream close handlers handle shutting + // down the daemon. + } + ); + } + + final DaemonStreams _daemonStreams; + + final Logger _logger; + + late final StreamSubscription> _commandSubscription; + + int _outgoingRequestId = 0; + final Map> _outgoingRequestCompleters = >{}; + + final StreamController> _events = StreamController>.broadcast(); + final StreamController> _incomingCommands = StreamController>(); + + /// A stream that contains all the incoming requests. + Stream> get incomingCommands => _incomingCommands.stream; + + /// Listens to the event with the event name [eventToListen]. + Stream listenToEvent(String eventToListen) { + return _events.stream + .where((Map event) => event['event'] == eventToListen) + .map((Map event) => event['params']); + } + + /// Sends a request to the other end of the connection. + /// + /// Returns a [Future] that resolves with the content. + Future sendRequest(String method, [Object? params]) async { + final String id = '${++_outgoingRequestId}'; + final Completer completer = Completer(); + _outgoingRequestCompleters[id] = completer; + final Map data = { + 'id': id, + 'method': method, + if (params != null) 'params': params, + }; + _logger.printTrace('-> Sending to daemon, id = $id, method = $method'); + _daemonStreams.send(data); + return completer.future; + } + + /// Sends a response to the other end of the connection. + void sendResponse(Object id, [Object? result]) { + _daemonStreams.send({ + 'id': id, + if (result != null) 'result': result, + }); + } + + /// Sends an error response to the other end of the connection. + void sendErrorResponse(Object id, Object error, StackTrace trace) { + _daemonStreams.send({ + 'id': id, + 'error': error, + 'trace': '$trace', + }); + } + + /// Sends an event to the client. + void sendEvent(String name, [ Object? params ]) { + _daemonStreams.send({ + 'event': name, + if (params != null) 'params': params, + }); + } + + /// Handles the input from the stream. + /// + /// There are three kinds of data: Request, Response, Event. + /// + /// Request: + /// {"id": . "method": , "params": } + /// + /// Response: + /// {"id": . "result": } for a successful response. + /// {"id": . "error": , "stackTrace": } for an error response. + /// + /// Event: + /// {"event": . "params": } + void _handleData(Map data) { + if (data['id'] != null) { + if (data['method'] == null) { + // This is a response to previously sent request. + final String id = data['id']! as String; + if (data['error'] != null) { + // This is an error response. + _logger.printTrace('<- Error response received from daemon, id = $id'); + final Object error = data['error']!; + final String stackTrace = data['stackTrace'] as String? ?? ''; + _outgoingRequestCompleters.remove(id)?.completeError(error, StackTrace.fromString(stackTrace)); + } else { + _logger.printTrace('<- Response received from daemon, id = $id'); + final Object? result = data['result']; + _outgoingRequestCompleters.remove(id)?.complete(result); + } + } else { + _incomingCommands.add(data); + } + } else if (data['event'] != null) { + // This is an event + _logger.printTrace('<- Event received: ${data['event']}'); + _events.add(data); + } else { + _logger.printError('Unknown data received from daemon'); + } + } + + /// Cleans up any resources used in the connection. + Future dispose() async { + await _commandSubscription.cancel(); + await _daemonStreams.dispose(); + unawaited(_events.close()); + unawaited(_incomingCommands.close()); + } +} diff --git a/packages/flutter_tools/test/commands.shard/hermetic/daemon_test.dart b/packages/flutter_tools/test/commands.shard/hermetic/daemon_test.dart index fdbd23d9091..d9af9f4cf50 100644 --- a/packages/flutter_tools/test/commands.shard/hermetic/daemon_test.dart +++ b/packages/flutter_tools/test/commands.shard/hermetic/daemon_test.dart @@ -20,6 +20,7 @@ import 'package:flutter_tools/src/base/logger.dart'; import 'package:flutter_tools/src/base/utils.dart'; import 'package:flutter_tools/src/build_info.dart'; import 'package:flutter_tools/src/commands/daemon.dart'; +import 'package:flutter_tools/src/daemon.dart'; import 'package:flutter_tools/src/device.dart'; import 'package:flutter_tools/src/features.dart'; import 'package:flutter_tools/src/fuchsia/fuchsia_workflow.dart'; @@ -48,75 +49,92 @@ Future _runFakeAsync(Future Function(FakeAsync time) f) async { }); } +class FakeDaemonStreams extends DaemonStreams { + final StreamController> inputs = StreamController>(); + final StreamController> outputs = StreamController>(); + + @override + Stream> get inputStream { + return inputs.stream; + } + + @override + void send(Map message) { + outputs.add(message); + } + + @override + Future dispose() async { + await inputs.close(); + // In some tests, outputs have no listeners. We don't wait for outputs to close. + unawaited(outputs.close()); + } +} + void main() { Daemon daemon; NotifyingLogger notifyingLogger; BufferLogger bufferLogger; group('daemon', () { + FakeDaemonStreams daemonStreams; + DaemonConnection daemonConnection; setUp(() { bufferLogger = BufferLogger.test(); notifyingLogger = NotifyingLogger(verbose: false, parent: bufferLogger); + daemonStreams = FakeDaemonStreams(); + daemonConnection = DaemonConnection( + daemonStreams: daemonStreams, + logger: bufferLogger, + ); }); - tearDown(() { + tearDown(() async { if (daemon != null) { return daemon.shutdown(); } notifyingLogger.dispose(); + await daemonConnection.dispose(); }); testUsingContext('daemon.version command should succeed', () async { - final StreamController> commands = StreamController>(); - final StreamController> responses = StreamController>(); daemon = Daemon( - commands.stream, - responses.add, + daemonConnection, notifyingLogger: notifyingLogger, ); - commands.add({'id': 0, 'method': 'daemon.version'}); - final Map response = await responses.stream.firstWhere(_notEvent); + daemonStreams.inputs.add({'id': 0, 'method': 'daemon.version'}); + final Map response = await daemonStreams.outputs.stream.firstWhere(_notEvent); expect(response['id'], 0); expect(response['result'], isNotEmpty); expect(response['result'], isA()); - await responses.close(); - await commands.close(); }); testUsingContext('daemon.getSupportedPlatforms command should succeed', () async { - final StreamController> commands = StreamController>(); - final StreamController> responses = StreamController>(); daemon = Daemon( - commands.stream, - responses.add, + daemonConnection, notifyingLogger: notifyingLogger, ); // Use the flutter_gallery project which has a known set of supported platforms. final String projectPath = globals.fs.path.join(getFlutterRoot(), 'dev', 'integration_tests', 'flutter_gallery'); - commands.add({'id': 0, 'method': 'daemon.getSupportedPlatforms', 'params': {'projectRoot': projectPath}}); - final Map response = await responses.stream.firstWhere(_notEvent); + daemonStreams.inputs.add({'id': 0, 'method': 'daemon.getSupportedPlatforms', 'params': {'projectRoot': projectPath}}); + final Map response = await daemonStreams.outputs.stream.firstWhere(_notEvent); expect(response['id'], 0); expect(response['result'], isNotEmpty); expect((response['result'] as Map)['platforms'], {'macos'}); - await responses.close(); - await commands.close(); }, overrides: { // Disable Android/iOS and enable macOS to make sure result is consistent and defaults are tested off. FeatureFlags: () => TestFeatureFlags(isAndroidEnabled: false, isIOSEnabled: false, isMacOSEnabled: true), }); testUsingContext('printError should send daemon.logMessage event', () async { - final StreamController> commands = StreamController>(); - final StreamController> responses = StreamController>(); daemon = Daemon( - commands.stream, - responses.add, + daemonConnection, notifyingLogger: notifyingLogger, ); globals.printError('daemon.logMessage test'); - final Map response = await responses.stream.firstWhere((Map map) { + final Map response = await daemonStreams.outputs.stream.firstWhere((Map map) { return map['event'] == 'daemon.logMessage' && (map['params'] as Map)['level'] == 'error'; }); expect(response['id'], isNull); @@ -124,22 +142,17 @@ void main() { final Map logMessage = castStringKeyedMap(response['params']).cast(); expect(logMessage['level'], 'error'); expect(logMessage['message'], 'daemon.logMessage test'); - await responses.close(); - await commands.close(); }, overrides: { Logger: () => notifyingLogger, }); testUsingContext('printWarning should send daemon.logMessage event', () async { - final StreamController> commands = StreamController>(); - final StreamController> responses = StreamController>(); daemon = Daemon( - commands.stream, - responses.add, + daemonConnection, notifyingLogger: notifyingLogger, ); globals.printWarning('daemon.logMessage test'); - final Map response = await responses.stream.firstWhere((Map map) { + final Map response = await daemonStreams.outputs.stream.firstWhere((Map map) { return map['event'] == 'daemon.logMessage' && (map['params'] as Map)['level'] == 'warning'; }); expect(response['id'], isNull); @@ -147,19 +160,14 @@ void main() { final Map logMessage = castStringKeyedMap(response['params']).cast(); expect(logMessage['level'], 'warning'); expect(logMessage['message'], 'daemon.logMessage test'); - await responses.close(); - await commands.close(); }, overrides: { Logger: () => notifyingLogger, }); testUsingContext('printStatus should log to stdout when logToStdout is enabled', () async { final StringBuffer buffer = await capturedConsolePrint(() { - final StreamController> commands = StreamController>(); - final StreamController> responses = StreamController>(); daemon = Daemon( - commands.stream, - responses.add, + daemonConnection, notifyingLogger: notifyingLogger, logToStdout: true, ); @@ -192,120 +200,89 @@ void main() { }); testUsingContext('daemon.shutdown command should stop daemon', () async { - final StreamController> commands = StreamController>(); - final StreamController> responses = StreamController>(); daemon = Daemon( - commands.stream, - responses.add, + daemonConnection, notifyingLogger: notifyingLogger, ); - commands.add({'id': 0, 'method': 'daemon.shutdown'}); + daemonStreams.inputs.add({'id': 0, 'method': 'daemon.shutdown'}); return daemon.onExit.then((int code) async { - await commands.close(); + await daemonStreams.inputs.close(); expect(code, 0); }); }); testUsingContext('app.restart without an appId should report an error', () async { - final StreamController> commands = StreamController>(); - final StreamController> responses = StreamController>(); daemon = Daemon( - commands.stream, - responses.add, + daemonConnection, notifyingLogger: notifyingLogger, ); - commands.add({'id': 0, 'method': 'app.restart'}); - final Map response = await responses.stream.firstWhere(_notEvent); + daemonStreams.inputs.add({'id': 0, 'method': 'app.restart'}); + final Map response = await daemonStreams.outputs.stream.firstWhere(_notEvent); expect(response['id'], 0); expect(response['error'], contains('appId is required')); - await responses.close(); - await commands.close(); }); testUsingContext('ext.flutter.debugPaint via service extension without an appId should report an error', () async { - final StreamController> commands = StreamController>(); - final StreamController> responses = StreamController>(); daemon = Daemon( - commands.stream, - responses.add, + daemonConnection, notifyingLogger: notifyingLogger, ); - commands.add({ + daemonStreams.inputs.add({ 'id': 0, 'method': 'app.callServiceExtension', 'params': { 'methodName': 'ext.flutter.debugPaint', }, }); - final Map response = await responses.stream.firstWhere(_notEvent); + final Map response = await daemonStreams.outputs.stream.firstWhere(_notEvent); expect(response['id'], 0); expect(response['error'], contains('appId is required')); - await responses.close(); - await commands.close(); }); testUsingContext('app.stop without appId should report an error', () async { - final StreamController> commands = StreamController>(); - final StreamController> responses = StreamController>(); daemon = Daemon( - commands.stream, - responses.add, + daemonConnection, notifyingLogger: notifyingLogger, ); - commands.add({'id': 0, 'method': 'app.stop'}); - final Map response = await responses.stream.firstWhere(_notEvent); + daemonStreams.inputs.add({'id': 0, 'method': 'app.stop'}); + final Map response = await daemonStreams.outputs.stream.firstWhere(_notEvent); expect(response['id'], 0); expect(response['error'], contains('appId is required')); - await responses.close(); - await commands.close(); }); testUsingContext('device.getDevices should respond with list', () async { - final StreamController> commands = StreamController>(); - final StreamController> responses = StreamController>(); daemon = Daemon( - commands.stream, - responses.add, + daemonConnection, notifyingLogger: notifyingLogger, ); - commands.add({'id': 0, 'method': 'device.getDevices'}); - final Map response = await responses.stream.firstWhere(_notEvent); + daemonStreams.inputs.add({'id': 0, 'method': 'device.getDevices'}); + final Map response = await daemonStreams.outputs.stream.firstWhere(_notEvent); expect(response['id'], 0); expect(response['result'], isList); - await responses.close(); - await commands.close(); }); testUsingContext('device.getDevices reports available devices', () async { - final StreamController> commands = StreamController>(); - final StreamController> responses = StreamController>(); daemon = Daemon( - commands.stream, - responses.add, + daemonConnection, notifyingLogger: notifyingLogger, ); final FakePollingDeviceDiscovery discoverer = FakePollingDeviceDiscovery(); daemon.deviceDomain.addDeviceDiscoverer(discoverer); discoverer.addDevice(FakeAndroidDevice()); - commands.add({'id': 0, 'method': 'device.getDevices'}); - final Map response = await responses.stream.firstWhere(_notEvent); + daemonStreams.inputs.add({'id': 0, 'method': 'device.getDevices'}); + final Map response = await daemonStreams.outputs.stream.firstWhere(_notEvent); expect(response['id'], 0); final dynamic result = response['result']; expect(result, isList); expect(result, isNotEmpty); - await responses.close(); - await commands.close(); }); testUsingContext('should send device.added event when device is discovered', () async { - final StreamController> commands = StreamController>(); - final StreamController> responses = StreamController>(); daemon = Daemon( - commands.stream, - responses.add, + daemonConnection, notifyingLogger: notifyingLogger, ); @@ -313,15 +290,12 @@ void main() { daemon.deviceDomain.addDeviceDiscoverer(discoverer); discoverer.addDevice(FakeAndroidDevice()); - return responses.stream.skipWhile(_isConnectedEvent).first.then((Map response) async { + return daemonStreams.outputs.stream.skipWhile(_isConnectedEvent).first.then((Map response) async { expect(response['event'], 'device.added'); expect(response['params'], isMap); final Map params = castStringKeyedMap(response['params']); expect(params['platform'], isNotEmpty); // the fake device has a platform of 'android-arm' - - await responses.close(); - await commands.close(); }); }, overrides: { AndroidWorkflow: () => FakeAndroidWorkflow(), @@ -330,121 +304,90 @@ void main() { }); testUsingContext('emulator.launch without an emulatorId should report an error', () async { - final StreamController> commands = StreamController>(); - final StreamController> responses = StreamController>(); daemon = Daemon( - commands.stream, - responses.add, + daemonConnection, notifyingLogger: notifyingLogger, ); - commands.add({'id': 0, 'method': 'emulator.launch'}); - final Map response = await responses.stream.firstWhere(_notEvent); + daemonStreams.inputs.add({'id': 0, 'method': 'emulator.launch'}); + final Map response = await daemonStreams.outputs.stream.firstWhere(_notEvent); expect(response['id'], 0); expect(response['error'], contains('emulatorId is required')); - await responses.close(); - await commands.close(); }); testUsingContext('emulator.launch coldboot parameter must be boolean', () async { - final StreamController> commands = StreamController>(); - final StreamController> responses = StreamController>(); daemon = Daemon( - commands.stream, - responses.add, + daemonConnection, notifyingLogger: notifyingLogger, ); final Map params = {'emulatorId': 'device', 'coldBoot': 1}; - commands.add({'id': 0, 'method': 'emulator.launch', 'params': params}); - final Map response = await responses.stream.firstWhere(_notEvent); + daemonStreams.inputs.add({'id': 0, 'method': 'emulator.launch', 'params': params}); + final Map response = await daemonStreams.outputs.stream.firstWhere(_notEvent); expect(response['id'], 0); expect(response['error'], contains('coldBoot is not a bool')); - await responses.close(); - await commands.close(); }); testUsingContext('emulator.getEmulators should respond with list', () async { - final StreamController> commands = StreamController>(); - final StreamController> responses = StreamController>(); daemon = Daemon( - commands.stream, - responses.add, + daemonConnection, notifyingLogger: notifyingLogger, ); - commands.add({'id': 0, 'method': 'emulator.getEmulators'}); - final Map response = await responses.stream.firstWhere(_notEvent); + daemonStreams.inputs.add({'id': 0, 'method': 'emulator.getEmulators'}); + final Map response = await daemonStreams.outputs.stream.firstWhere(_notEvent); expect(response['id'], 0); expect(response['result'], isList); - await responses.close(); - await commands.close(); }); testUsingContext('daemon can send exposeUrl requests to the client', () async { const String originalUrl = 'http://localhost:1234/'; const String mappedUrl = 'https://publichost:4321/'; - final StreamController> input = StreamController>(); - final StreamController> output = StreamController>(); daemon = Daemon( - input.stream, - output.add, + daemonConnection, notifyingLogger: notifyingLogger, ); // Respond to any requests from the daemon to expose a URL. - unawaited(output.stream + unawaited(daemonStreams.outputs.stream .firstWhere((Map request) => request['method'] == 'app.exposeUrl') .then((Map request) { expect((request['params'] as Map)['url'], equals(originalUrl)); - input.add({'id': request['id'], 'result': {'url': mappedUrl}}); + daemonStreams.inputs.add({'id': request['id'], 'result': {'url': mappedUrl}}); }) ); final String exposedUrl = await daemon.daemonDomain.exposeUrl(originalUrl); expect(exposedUrl, equals(mappedUrl)); - - await output.close(); - await input.close(); }); testUsingContext('devtools.serve command should return host and port on success', () async { - final StreamController> commands = StreamController>(); - final StreamController> responses = StreamController>(); daemon = Daemon( - commands.stream, - responses.add, + daemonConnection, notifyingLogger: notifyingLogger, ); - commands.add({'id': 0, 'method': 'devtools.serve'}); - final Map response = await responses.stream.firstWhere((Map response) => response['id'] == 0); + daemonStreams.inputs.add({'id': 0, 'method': 'devtools.serve'}); + final Map response = await daemonStreams.outputs.stream.firstWhere((Map response) => response['id'] == 0); final Map result = response['result'] as Map; expect(result, isNotEmpty); expect(result['host'], '127.0.0.1'); expect(result['port'], 1234); - await responses.close(); - await commands.close(); }, overrides: { DevtoolsLauncher: () => FakeDevtoolsLauncher(DevToolsServerAddress('127.0.0.1', 1234)), }); testUsingContext('devtools.serve command should return null fields if null returned', () async { - final StreamController> commands = StreamController>(); - final StreamController> responses = StreamController>(); daemon = Daemon( - commands.stream, - responses.add, + daemonConnection, notifyingLogger: notifyingLogger, ); - commands.add({'id': 0, 'method': 'devtools.serve'}); - final Map response = await responses.stream.firstWhere((Map response) => response['id'] == 0); + daemonStreams.inputs.add({'id': 0, 'method': 'devtools.serve'}); + final Map response = await daemonStreams.outputs.stream.firstWhere((Map response) => response['id'] == 0); final Map result = response['result'] as Map; expect(result, isNotEmpty); expect(result['host'], null); expect(result['port'], null); - await responses.close(); - await commands.close(); }, overrides: { DevtoolsLauncher: () => FakeDevtoolsLauncher(null), }); @@ -483,19 +426,6 @@ void main() { expect(message.message, 'hello'); }); - group('daemon serialization', () { - test('OperationResult', () { - expect( - jsonEncodeObject(OperationResult.ok), - '{"code":0,"message":""}', - ); - expect( - jsonEncodeObject(OperationResult(1, 'foo')), - '{"code":1,"message":"foo"}', - ); - }); - }); - group('daemon queue', () { DebounceOperationQueue queue; const Duration debounceDuration = Duration(seconds: 1); diff --git a/packages/flutter_tools/test/general.shard/daemon_test.dart b/packages/flutter_tools/test/general.shard/daemon_test.dart new file mode 100644 index 00000000000..fb2d9954d35 --- /dev/null +++ b/packages/flutter_tools/test/general.shard/daemon_test.dart @@ -0,0 +1,240 @@ +// Copyright 2014 The Flutter Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +import 'dart:async'; +import 'dart:typed_data'; + +import 'package:flutter_tools/src/base/common.dart'; +import 'package:flutter_tools/src/base/io.dart'; +import 'package:flutter_tools/src/base/logger.dart'; +import 'package:flutter_tools/src/convert.dart'; +import 'package:flutter_tools/src/daemon.dart'; +import 'package:test/fake.dart'; + +import '../src/common.dart'; + +class FakeDaemonStreams extends DaemonStreams { + final StreamController> inputs = StreamController>(); + final StreamController> outputs = StreamController>(); + + @override + Stream> get inputStream { + return inputs.stream; + } + + @override + void send(Map message) { + outputs.add(message); + } + + @override + Future dispose() async { + await inputs.close(); + // In some tests, outputs have no listeners. We don't wait for outputs to close. + unawaited(outputs.close()); + } +} + +void main() { + late BufferLogger bufferLogger; + late FakeDaemonStreams daemonStreams; + late DaemonConnection daemonConnection; + setUp(() { + bufferLogger = BufferLogger.test(); + daemonStreams = FakeDaemonStreams(); + daemonConnection = DaemonConnection( + daemonStreams: daemonStreams, + logger: bufferLogger, + ); + }); + + tearDown(() async { + await daemonConnection.dispose(); + }); + + group('DaemonConnection receiving end', () { + testWithoutContext('redirects input to incoming commands', () async { + final Map commandToSend = {'id': 0, 'method': 'some_method'}; + daemonStreams.inputs.add(commandToSend); + + final Map commandReceived = await daemonConnection.incomingCommands.first; + await daemonStreams.dispose(); + + expect(commandReceived, commandToSend); + }); + + testWithoutContext('listenToEvent can receive the right events', () async { + final Future> events = daemonConnection.listenToEvent('event1').toList(); + + daemonStreams.inputs.add({'event': 'event1', 'params': '1'}); + daemonStreams.inputs.add({'event': 'event2', 'params': '2'}); + daemonStreams.inputs.add({'event': 'event1', 'params': null}); + daemonStreams.inputs.add({'event': 'event1', 'params': 3}); + + await pumpEventQueue(); + await daemonConnection.dispose(); + + expect(await events, ['1', null, 3]); + }); + }); + + group('DaemonConnection sending end', () { + testWithoutContext('sending requests', () async { + unawaited(daemonConnection.sendRequest('some_method', 'param')); + final Map data = await daemonStreams.outputs.stream.first; + expect(data['id'], isNotNull); + expect(data['method'], 'some_method'); + expect(data['params'], 'param'); + }); + + testWithoutContext('sending requests without param', () async { + unawaited(daemonConnection.sendRequest('some_method')); + final Map data = await daemonStreams.outputs.stream.first; + expect(data['id'], isNotNull); + expect(data['method'], 'some_method'); + expect(data['params'], isNull); + }); + + testWithoutContext('sending response', () async { + daemonConnection.sendResponse('1', 'some_data'); + final Map data = await daemonStreams.outputs.stream.first; + expect(data['id'], '1'); + expect(data['method'], isNull); + expect(data['error'], isNull); + expect(data['result'], 'some_data'); + }); + + testWithoutContext('sending response without data', () async { + daemonConnection.sendResponse('1'); + final Map data = await daemonStreams.outputs.stream.first; + expect(data['id'], '1'); + expect(data['method'], isNull); + expect(data['error'], isNull); + expect(data['result'], isNull); + }); + + testWithoutContext('sending error response', () async { + daemonConnection.sendErrorResponse('1', 'error', StackTrace.fromString('stack trace')); + final Map data = await daemonStreams.outputs.stream.first; + expect(data['id'], '1'); + expect(data['method'], isNull); + expect(data['error'], 'error'); + expect(data['trace'], 'stack trace'); + }); + + testWithoutContext('sending events', () async { + daemonConnection.sendEvent('some_event', '123'); + final Map data = await daemonStreams.outputs.stream.first; + expect(data['id'], isNull); + expect(data['event'], 'some_event'); + expect(data['params'], '123'); + }); + + testWithoutContext('sending events without params', () async { + daemonConnection.sendEvent('some_event'); + final Map data = await daemonStreams.outputs.stream.first; + expect(data['id'], isNull); + expect(data['event'], 'some_event'); + expect(data['params'], isNull); + }); + }); + + group('DaemonConnection request and response', () { + testWithoutContext('receiving response from requests', () async { + final Future requestFuture = daemonConnection.sendRequest('some_method', 'param'); + final Map data = await daemonStreams.outputs.stream.first; + + expect(data['id'], isNotNull); + expect(data['method'], 'some_method'); + expect(data['params'], 'param'); + + final String id = data['id'] as String; + daemonStreams.inputs.add({'id': id, 'result': '123'}); + expect(await requestFuture, '123'); + }); + + testWithoutContext('receiving response from requests without result', () async { + final Future requestFuture = daemonConnection.sendRequest('some_method', 'param'); + final Map data = await daemonStreams.outputs.stream.first; + + expect(data['id'], isNotNull); + expect(data['method'], 'some_method'); + expect(data['params'], 'param'); + + final String id = data['id'] as String; + daemonStreams.inputs.add({'id': id}); + expect(await requestFuture, null); + }); + + testWithoutContext('receiving error response from requests without result', () async { + final Future requestFuture = daemonConnection.sendRequest('some_method', 'param'); + final Map data = await daemonStreams.outputs.stream.first; + + expect(data['id'], isNotNull); + expect(data['method'], 'some_method'); + expect(data['params'], 'param'); + + final String id = data['id'] as String; + daemonStreams.inputs.add({'id': id, 'error': 'some_error', 'trace': 'stack trace'}); + expect(requestFuture, throwsA('some_error')); + }); + }); + + group('TcpDaemonStreams', () { + final Map testCommand = { + 'id': 100, + 'method': 'test', + }; + late FakeSocket socket; + late TcpDaemonStreams daemonStreams; + setUp(() { + socket = FakeSocket(); + daemonStreams = TcpDaemonStreams(socket, logger: bufferLogger); + }); + + test('parses the message received on the socket', () async { + socket.controller.add(Uint8List.fromList(utf8.encode('[${jsonEncode(testCommand)}]\n'))); + final Map command = await daemonStreams.inputStream.first; + expect(command, testCommand); + }); + + test('sends the encoded message through the socket', () async { + daemonStreams.send(testCommand); + await pumpEventQueue(); + expect(socket.writtenObjects.length, 1); + expect(socket.writtenObjects[0].toString(), '[${jsonEncode(testCommand)}]\n'); + }); + + test('dispose calls socket.close', () async { + await daemonStreams.dispose(); + expect(socket.closeCalled, isTrue); + }); + }); +} + +class FakeSocket extends Fake implements Socket { + bool closeCalled = false; + final StreamController controller = StreamController(); + final List writtenObjects = []; + + @override + StreamSubscription listen( + void Function(Uint8List event)? onData, { + Function? onError, + void Function()? onDone, + bool? cancelOnError, + }) { + return controller.stream.listen(onData, onError: onError, onDone: onDone, cancelOnError: cancelOnError); + } + + @override + void write(Object? object) { + writtenObjects.add(object); + } + + @override + Future close() async { + closeCalled = true; + } +}