diff --git a/dev/devicelab/lib/framework/framework.dart b/dev/devicelab/lib/framework/framework.dart index d64fe2e1502..9036f22fcd5 100644 --- a/dev/devicelab/lib/framework/framework.dart +++ b/dev/devicelab/lib/framework/framework.dart @@ -66,6 +66,12 @@ Future task(TaskFunction task, { ProcessManager? processManager }) a class _TaskRunner { _TaskRunner(this.task, this.processManager) { + final String successResponse = json.encode( + const { + 'result': 'success', + }, + ); + registerExtension('ext.cocoonRunTask', (String method, Map parameters) async { final Duration? taskTimeout = parameters.containsKey('timeoutInMinutes') @@ -82,11 +88,25 @@ class _TaskRunner { localEngine: localEngine, localEngineHost: localEngineHost, ); + const Duration taskResultReceivedTimeout = Duration(seconds: 30); + _taskResultReceivedTimeout = Timer( + taskResultReceivedTimeout, + () { + logger.severe('Task runner did not acknowledge task results in $taskResultReceivedTimeout.'); + _closeKeepAlivePort(); + exitCode = 1; + } + ); return ServiceExtensionResponse.result(json.encode(result.toJson())); }); registerExtension('ext.cocoonRunnerReady', (String method, Map parameters) async { - return ServiceExtensionResponse.result('"ready"'); + return ServiceExtensionResponse.result(successResponse); + }); + registerExtension('ext.cocoonTaskResultReceived', + (String method, Map parameters) async { + _closeKeepAlivePort(); + return ServiceExtensionResponse.result(successResponse); }); } @@ -104,6 +124,7 @@ class _TaskRunner { // TODO(ianh): workaround for https://github.com/dart-lang/sdk/issues/23797 RawReceivePort? _keepAlivePort; Timer? _startTaskTimeout; + Timer? _taskResultReceivedTimeout; bool _taskStarted = false; final Completer _completer = Completer(); @@ -210,7 +231,6 @@ class _TaskRunner { } finally { await checkForRebootRequired(); await forceQuitRunningProcesses(); - _closeKeepAlivePort(); } } @@ -269,6 +289,7 @@ class _TaskRunner { /// Disables the keepalive port, allowing the VM to exit. void _closeKeepAlivePort() { _startTaskTimeout?.cancel(); + _taskResultReceivedTimeout?.cancel(); _keepAlivePort?.close(); } diff --git a/dev/devicelab/lib/framework/runner.dart b/dev/devicelab/lib/framework/runner.dart index 8e3ec26c016..e05bda419f3 100644 --- a/dev/devicelab/lib/framework/runner.dart +++ b/dev/devicelab/lib/framework/runner.dart @@ -8,6 +8,7 @@ import 'dart:io'; import 'package:meta/meta.dart'; import 'package:vm_service/vm_service.dart'; +import 'package:vm_service/vm_service_io.dart'; import 'cocoon.dart'; import 'devices.dart'; @@ -237,11 +238,19 @@ Future runTask( print('[$taskName] Connected to VM server.'); isolateParams = isolateParams == null ? {} : Map.of(isolateParams); isolateParams['runProcessCleanup'] = terminateStrayDartProcesses.toString(); - final Map taskResultJson = (await result.vmService.callServiceExtension( + final VmService service = result.vmService; + final String isolateId = result.isolate.id!; + final Map taskResultJson = (await service.callServiceExtension( 'ext.cocoonRunTask', args: isolateParams, - isolateId: result.isolate.id, + isolateId: isolateId, )).json!; + // Notify the task process that the task result has been received and it + // can proceed to shutdown. + await _acknowledgeTaskResultReceived( + service: service, + isolateId: isolateId, + ); final TaskResult taskResult = TaskResult.fromJson(taskResultJson); final int exitCode = await runner.exitCode; print('[$taskName] Process terminated with exit code $exitCode.'); @@ -270,9 +279,6 @@ Future _connectToRunnerIsolate(Uri vmServiceUri) async { while (true) { try { - // Make sure VM server is up by successfully opening and closing a socket. - await (await WebSocket.connect(url)).close(); - // Look up the isolate. final VmService client = await vmServiceConnectUri(url); VM vm = await client.getVM(); @@ -281,8 +287,9 @@ Future _connectToRunnerIsolate(Uri vmServiceUri) async { vm = await client.getVM(); } final IsolateRef isolate = vm.isolates!.first; + // Sanity check to ensure we're talking with the main isolate. final Response response = await client.callServiceExtension('ext.cocoonRunnerReady', isolateId: isolate.id); - if (response.json!['response'] != 'ready') { + if (response.json!['result'] != 'success') { throw 'not ready yet'; } return ConnectionResult(client, isolate); @@ -301,37 +308,23 @@ Future _connectToRunnerIsolate(Uri vmServiceUri) async { } } +Future _acknowledgeTaskResultReceived({ + required VmService service, + required String isolateId, + }) async { + try { + await service.callServiceExtension( + 'ext.cocoonTaskResultReceived', + isolateId: isolateId, + ); + } on RPCError { + // The target VM may shutdown before the response is received. + } +} + class ConnectionResult { ConnectionResult(this.vmService, this.isolate); final VmService vmService; final IsolateRef isolate; } - -/// The cocoon client sends an invalid VM service response, we need to intercept it. -Future vmServiceConnectUri(String wsUri, {Log? log}) async { - final WebSocket socket = await WebSocket.connect(wsUri); - final StreamController controller = StreamController(); - final Completer streamClosedCompleter = Completer(); - socket.listen( - (dynamic data) { - final Map rawData = json.decode(data as String) as Map ; - if (rawData['result'] == 'ready') { - rawData['result'] = {'response': 'ready'}; - controller.add(json.encode(rawData)); - } else { - controller.add(data); - } - }, - onError: (Object err, StackTrace stackTrace) => controller.addError(err, stackTrace), - onDone: () => streamClosedCompleter.complete(), - ); - - return VmService( - controller.stream, - (String message) => socket.add(message), - log: log, - disposeHandler: () => socket.close(), - streamClosed: streamClosedCompleter.future, - ); -} diff --git a/dev/devicelab/test/runner_test.dart b/dev/devicelab/test/runner_test.dart index e8684321489..9ec97c4d9ec 100644 --- a/dev/devicelab/test/runner_test.dart +++ b/dev/devicelab/test/runner_test.dart @@ -3,6 +3,7 @@ // found in the LICENSE file. import 'package:flutter_devicelab/framework/runner.dart'; +import 'package:vm_service/vm_service.dart'; import 'common.dart'; @@ -40,5 +41,33 @@ void main() { expect(printLog[0], 'Consistently failed across all 3 executions.'); expect(printLog[1], 'flaky: false'); }); + + test('Ensures task results are received before task process shuts down.', () async { + // Regression test for https://github.com/flutter/flutter/issues/155475 + // + // Runs multiple concurrent instances of a short-lived task in an effort to + // trigger the race between the VM service processing the response from + // ext.cocoonRunTask and the VM shutting down, which will throw a RPCError + // with a "Service connection disposed" message. + // + // Obviously this isn't foolproof, but this test becoming flaky or failing + // consistently should signal that we're encountering a shutdown race + // somewhere. + const int runs = 30; + try { + await Future.wait( + >[ + for (int i = 0; i < runs; ++i) + runTasks( + ['smoke_test_success'], + isolateParams: isolateParams, + ), + ], + eagerError: true, + ); + } on RPCError catch (e) { + fail('Unexpected RPCError: $e'); + } + }, timeout: const Timeout.factor(2)); }); }