diff --git a/packages/flutter_tools/lib/executable.dart b/packages/flutter_tools/lib/executable.dart index 6891fc46ec8..2fa5e20923f 100644 --- a/packages/flutter_tools/lib/executable.dart +++ b/packages/flutter_tools/lib/executable.dart @@ -168,8 +168,14 @@ Future main(List args) async { _createCrashReport(args, error, chain).then((File file) { stderr.writeln( - 'Crash report written to ${file.path};\n' - 'please let us know at https://github.com/flutter/flutter/issues.' + 'Crash report written to ${file.path};\n' + 'please let us know at https://github.com/flutter/flutter/issues.', + ); + _exit(1); + }).catchError((dynamic error) { + stderr.writeln( + 'Unable to generate crash report due to secondary error: $error\n' + 'please let us know at https://github.com/flutter/flutter/issues.', ); _exit(1); }); @@ -180,6 +186,7 @@ Future main(List args) async { } Future _createCrashReport(List args, dynamic error, Chain chain) async { + FileSystem fs = const LocalFileSystem(); File crashFile = getUniqueFile(fs.currentDirectory, 'flutter', 'log'); StringBuffer buffer = new StringBuffer(); @@ -197,12 +204,12 @@ Future _createCrashReport(List args, dynamic error, Chain chain) a buffer.writeln('```\n${await _doctorText()}```'); try { - crashFile.writeAsStringSync(buffer.toString()); + await crashFile.writeAsString(buffer.toString()); } on FileSystemException catch (_) { // Fallback to the system temporary directory. crashFile = getUniqueFile(fs.systemTempDirectory, 'flutter', 'log'); try { - crashFile.writeAsStringSync(buffer.toString()); + await crashFile.writeAsString(buffer.toString()); } on FileSystemException catch (e) { printError('Could not write crash report to disk: $e'); printError(buffer.toString()); diff --git a/packages/flutter_tools/lib/src/base/process.dart b/packages/flutter_tools/lib/src/base/process.dart index c8edfbfd8a5..b22671e4408 100644 --- a/packages/flutter_tools/lib/src/base/process.dart +++ b/packages/flutter_tools/lib/src/base/process.dart @@ -27,8 +27,10 @@ Future runShutdownHooks() async { _shutdownHooks.clear(); _shutdownHooksRunning = true; try { + List> futures = >[]; for (ShutdownHook shutdownHook in hooks) - await shutdownHook(); + futures.add(shutdownHook()); + await Future.wait(futures); } finally { _shutdownHooksRunning = false; } diff --git a/packages/flutter_tools/lib/src/base/utils.dart b/packages/flutter_tools/lib/src/base/utils.dart index 608fa7e0a13..cfc216cf14c 100644 --- a/packages/flutter_tools/lib/src/base/utils.dart +++ b/packages/flutter_tools/lib/src/base/utils.dart @@ -61,6 +61,7 @@ String getEnumName(dynamic enumItem) { } File getUniqueFile(Directory dir, String baseName, String ext) { + FileSystem fs = dir.fileSystem; int i = 1; while (true) { diff --git a/packages/flutter_tools/lib/src/runner/flutter_command_runner.dart b/packages/flutter_tools/lib/src/runner/flutter_command_runner.dart index 5cb6ca955d3..09cc64f9db8 100644 --- a/packages/flutter_tools/lib/src/runner/flutter_command_runner.dart +++ b/packages/flutter_tools/lib/src/runner/flutter_command_runner.dart @@ -22,6 +22,7 @@ import '../device.dart'; import '../globals.dart'; import '../usage.dart'; import '../version.dart'; +import '../vmservice.dart'; const String kFlutterRootEnvironmentVariableName = 'FLUTTER_ROOT'; // should point to //flutter/ (root of flutter/flutter repo) const String kFlutterEngineEnvironmentVariableName = 'FLUTTER_ENGINE'; // should point to //engine/src/ (root of flutter/engine repo) @@ -169,6 +170,7 @@ class FlutterCommandRunner extends CommandRunner { enableRecordingProcessManager(recordTo); enableRecordingFileSystem(recordTo); await enableRecordingPlatform(recordTo); + VMService.enableRecordingConnection(recordTo); } if (globalResults['replay-from'] != null) { @@ -178,6 +180,7 @@ class FlutterCommandRunner extends CommandRunner { await enableReplayProcessManager(replayFrom); enableReplayFileSystem(replayFrom); await enableReplayPlatform(replayFrom); + VMService.enableReplayConnection(replayFrom); } logger.quiet = globalResults['quiet']; diff --git a/packages/flutter_tools/lib/src/vmservice.dart b/packages/flutter_tools/lib/src/vmservice.dart index b3114d83749..cc6ded44cd4 100644 --- a/packages/flutter_tools/lib/src/vmservice.dart +++ b/packages/flutter_tools/lib/src/vmservice.dart @@ -5,9 +5,9 @@ import 'dart:async'; import 'dart:convert' show BASE64; +import 'package:file/file.dart'; import 'package:json_rpc_2/error_code.dart' as rpc_error_code; import 'package:json_rpc_2/json_rpc_2.dart' as rpc; -import 'package:meta/meta.dart'; import 'package:stream_channel/stream_channel.dart'; import 'package:web_socket_channel/io.dart'; import 'package:web_socket_channel/web_socket_channel.dart'; @@ -15,14 +15,17 @@ import 'package:web_socket_channel/web_socket_channel.dart'; import 'base/common.dart'; import 'base/file_system.dart'; import 'globals.dart'; +import 'vmservice_record_replay.dart'; /// A function that opens a two-way communication channel to the specified [uri]. -typedef StreamChannel OpenChannel(Uri uri); +typedef StreamChannel _OpenChannel(Uri uri); -OpenChannel _openChannel = _defaultOpenChannel; +_OpenChannel _openChannel = _defaultOpenChannel; -StreamChannel _defaultOpenChannel(Uri uri) => - new IOWebSocketChannel.connect(uri.toString()); +const String _kRecordingType = 'vmservice'; + +StreamChannel _defaultOpenChannel(Uri uri) => + new IOWebSocketChannel.connect(uri.toString()).cast(); /// The default VM service request timeout. const Duration kDefaultRequestTimeout = const Duration(seconds: 10); @@ -43,9 +46,29 @@ class VMService { }); } - @visibleForTesting - static void setOpenChannelForTesting(OpenChannel openChannel) { - _openChannel = openChannel; + /// Enables recording of VMService JSON-rpc activity to the specified base + /// recording [location]. + /// + /// Activity will be recorded in a subdirectory of [location] named + /// `"vmservice"`. It is permissible for [location] to represent an existing + /// non-empty directory as long as there is no collision with the + /// `"vmservice"` subdirectory. + static void enableRecordingConnection(String location) { + Directory dir = getRecordingSink(location, _kRecordingType); + _openChannel = (Uri uri) { + StreamChannel delegate = _defaultOpenChannel(uri); + return new RecordingVMServiceChannel(delegate, dir); + }; + } + + /// Enables VMService JSON-rpc replay mode. + /// + /// [location] must represent a directory to which VMService JSON-rpc + /// activity has been recorded (i.e. the result of having been previously + /// passed to [enableRecordingConnection]), or a [ToolExit] will be thrown. + static void enableReplayConnection(String location) { + Directory dir = getReplaySource(location, _kRecordingType); + _openChannel = (Uri uri) => new ReplayVMServiceChannel(dir); } /// Connect to a Dart VM Service at [httpUri]. @@ -57,8 +80,8 @@ class VMService { Duration requestTimeout: kDefaultRequestTimeout, }) { Uri wsUri = httpUri.replace(scheme: 'ws', path: fs.path.join(httpUri.path, 'ws')); - StreamChannel channel = _openChannel(wsUri); - rpc.Peer peer = new rpc.Peer.withoutJson(jsonDocument.bind(channel.cast())); + StreamChannel channel = _openChannel(wsUri); + rpc.Peer peer = new rpc.Peer.withoutJson(jsonDocument.bind(channel)); return new VMService._(peer, httpUri, wsUri, requestTimeout); } diff --git a/packages/flutter_tools/lib/src/vmservice_record_replay.dart b/packages/flutter_tools/lib/src/vmservice_record_replay.dart new file mode 100644 index 00000000000..8afe56060b1 --- /dev/null +++ b/packages/flutter_tools/lib/src/vmservice_record_replay.dart @@ -0,0 +1,299 @@ +// Copyright 2017 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +import 'dart:async'; +import 'dart:convert'; + +import 'package:file/file.dart'; +import 'package:stream_channel/stream_channel.dart'; + +import 'base/io.dart'; +import 'base/process.dart'; +import 'globals.dart'; + +const String _kManifest = 'MANIFEST.txt'; +const String _kRequest = 'request'; +const String _kResponse = 'response'; +const String _kId = 'id'; +const String _kType = 'type'; +const String _kData = 'data'; + +/// A [StreamChannel] that expects VM service (JSON-rpc) protocol messages and +/// serializes all such messages to the file system for later playback. +class RecordingVMServiceChannel extends DelegatingStreamChannel { + final List<_Message> _messages = <_Message>[]; + + _RecordingStream _streamRecorder; + _RecordingSink _sinkRecorder; + + RecordingVMServiceChannel(StreamChannel delegate, Directory location) + : super(delegate) { + addShutdownHook(() async { + // Sort the messages such that they are ordered + // `[request1, response1, request2, response2, ...]`. This serves no + // other purpose than to make the serialized format more human-readable. + _messages.sort((_Message message1, _Message message2) { + int id1 = message1.id; + int id2 = message2.id; + int result = id1.compareTo(id2); + if (result != 0) { + return result; + } else if (message1.type == _kRequest) { + return -1; + } else { + return 1; + } + }); + + File file = _getManifest(location); + String json = new JsonEncoder.withIndent(' ').convert(_messages); + await file.writeAsString(json, flush: true); + }); + } + + @override + Stream get stream { + if (_streamRecorder == null) { + _streamRecorder = new _RecordingStream(super.stream, _messages); + } + return _streamRecorder.stream; + } + + @override + StreamSink get sink { + if (_sinkRecorder == null) { + _sinkRecorder = new _RecordingSink(super.sink, _messages); + } + return _sinkRecorder; + } +} + +/// Base class for request and response JSON-rpc messages. +abstract class _Message { + final String type; + final Map data; + + _Message(this.type, this.data); + + factory _Message.fromRecording(Map recordingData) { + return recordingData[_kType] == _kRequest + ? new _Request(recordingData[_kData]) + : new _Response(recordingData[_kData]); + } + + int get id => data[_kId]; + + /// Allows [JsonEncoder] to properly encode objects of this type. + Map toJson() { + return { + _kType: type, + _kData: data, + }; + } +} + +/// A VM service JSON-rpc request (sent to the VM). +class _Request extends _Message { + _Request(Map data) : super(_kRequest, data); + _Request.fromString(String data) : this(JSON.decoder.convert(data)); +} + +/// A VM service JSON-rpc response (from the VM). +class _Response extends _Message { + _Response(Map data) : super(_kResponse, data); + _Response.fromString(String data) : this(JSON.decoder.convert(data)); +} + +/// A matching request/response pair. +/// +/// A request and response match by virtue of having matching +/// [IDs](_Message.id). +class _Transaction { + _Request request; + _Response response; +} + +/// A helper class that monitors a [Stream] of VM service JSON-rpc responses +/// and saves the responses to a recording. +class _RecordingStream { + final Stream _delegate; + final StreamController _controller; + final List<_Message> _recording; + StreamSubscription _subscription; + + _RecordingStream(Stream stream, this._recording) + : _delegate = stream, + _controller = stream.isBroadcast + ? new StreamController.broadcast() + : new StreamController() { + _controller.onListen = () { + assert(_subscription == null); + _subscription = _listenToStream(); + }; + _controller.onCancel = () async { + assert(_subscription != null); + await _subscription.cancel(); + _subscription = null; + }; + _controller.onPause = () { + assert(_subscription != null && !_subscription.isPaused); + _subscription.pause(); + }; + _controller.onResume = () { + assert(_subscription != null && _subscription.isPaused); + _subscription.resume(); + }; + } + + StreamSubscription _listenToStream() { + return _delegate.listen( + (String element) { + _recording.add(new _Response.fromString(element)); + _controller.add(element); + }, + onError: (dynamic error, StackTrace stackTrace) { + // We currently don't support recording of errors. + _controller.addError(error, stackTrace); + }, + onDone: () { + _controller.close(); + }, + ); + } + + /// The wrapped [Stream] to expose to callers. + Stream get stream => _controller.stream; +} + +/// A [StreamSink] that monitors VM service JSON-rpc requests and saves the +/// requests to a recording. +class _RecordingSink implements StreamSink { + final StreamSink _delegate; + final List<_Message> _recording; + + _RecordingSink(this._delegate, this._recording); + + @override + Future close() => _delegate.close(); + + @override + Future get done => _delegate.done; + + @override + void add(String data) { + _delegate.add(data); + _recording.add(new _Request.fromString(data)); + } + + @override + void addError(dynamic errorEvent, [StackTrace stackTrace]) { + throw new UnimplementedError('Add support for this if the need ever arises'); + } + + @override + Future addStream(Stream stream) { + throw new UnimplementedError('Add support for this if the need ever arises'); + } +} + +/// A [StreamChannel] that expects VM service (JSON-rpc) requests to be written +/// to its [StreamChannel.sink], looks up those requests in a recording, and +/// replays the corresponding responses back from the recording. +class ReplayVMServiceChannel extends StreamChannelMixin { + final Map _transactions; + final StreamController _controller = new StreamController(); + _ReplaySink _replaySink; + + ReplayVMServiceChannel(Directory location) + : _transactions = _loadTransactions(location); + + static Map _loadTransactions(Directory location) { + File file = _getManifest(location); + String json = file.readAsStringSync(); + Iterable<_Message> messages = JSON.decoder.convert(json).map<_Message>(_toMessage); + Map transactions = {}; + for (_Message message in messages) { + _Transaction transaction = + transactions.putIfAbsent(message.id, () => new _Transaction()); + if (message.type == _kRequest) { + assert(transaction.request == null); + transaction.request = message; + } else { + assert(transaction.response == null); + transaction.response = message; + } + } + return transactions; + } + + static _Message _toMessage(Map jsonData) { + return new _Message.fromRecording(jsonData); + } + + void send(_Request request) { + if (!_transactions.containsKey(request.id)) + throw new ArgumentError('No matching invocation found'); + _Transaction transaction = _transactions.remove(request.id); + // TODO(tvolkert): validate that `transaction.request` matches `request` + if (transaction.response == null) { + // This signals that when we were recording, the VM shut down before + // we received the response. This is typically due to the user quitting + // the app runner. We follow suit here and exit. + printStatus('Exiting due to dangling request'); + exit(0); + } else { + _controller.add(JSON.encoder.convert(transaction.response.data)); + if (_transactions.isEmpty) + _controller.close(); + } + } + + @override + StreamSink get sink { + if (_replaySink == null) + _replaySink = new _ReplaySink(this); + return _replaySink; + } + + @override + Stream get stream => _controller.stream; +} + +class _ReplaySink implements StreamSink { + final ReplayVMServiceChannel channel; + final Completer _completer = new Completer(); + + _ReplaySink(this.channel); + + @override + Future close() { + _completer.complete(); + return _completer.future; + } + + @override + Future get done => _completer.future; + + @override + void add(String data) { + if (_completer.isCompleted) + throw new StateError('Sink already closed'); + channel.send(new _Request.fromString(data)); + } + + @override + void addError(dynamic errorEvent, [StackTrace stackTrace]) { + throw new UnimplementedError('Add support for this if the need ever arises'); + } + + @override + Future addStream(Stream stream) { + throw new UnimplementedError('Add support for this if the need ever arises'); + } +} + +File _getManifest(Directory location) { + String path = location.fileSystem.path.join(location.path, _kManifest); + return location.fileSystem.file(path); +}