From 2c84e63ba73233e2c8f4297cb0ba828a8e6106da Mon Sep 17 00:00:00 2001 From: Ben Konyi Date: Wed, 2 Oct 2024 17:01:24 -0400 Subject: [PATCH] [ Cocoon ] Wait for task results to be received by the task runner before shutting down the task process (#156002) Prior to this fix, `_TaskRunner.run` would immediately cleanup the keep-alive port once the task completed, which would result in the isolate shutting down as soon as the task result was returned from `ext.cocoonRunTask` callback in the form of a `ServiceExtensionResponse`. Since the service extension response is processed by the service isolate, it was possible for the VM to start shutting down before the service isolate could send the task result data back to the task runner. This change introduces a new service extension, `ext.cocoonTaskResultReceived`, that the task runner invokes after it receives the task result from `ext.cocoonRunTask`, notifying the task process that it can close the keep-alive port and shutdown. Fixes https://github.com/flutter/flutter/issues/155475 --- dev/devicelab/lib/framework/framework.dart | 25 ++++++++- dev/devicelab/lib/framework/runner.dart | 61 ++++++++++------------ dev/devicelab/test/runner_test.dart | 29 ++++++++++ 3 files changed, 79 insertions(+), 36 deletions(-) diff --git a/dev/devicelab/lib/framework/framework.dart b/dev/devicelab/lib/framework/framework.dart index d64fe2e1502..9036f22fcd5 100644 --- a/dev/devicelab/lib/framework/framework.dart +++ b/dev/devicelab/lib/framework/framework.dart @@ -66,6 +66,12 @@ Future task(TaskFunction task, { ProcessManager? processManager }) a class _TaskRunner { _TaskRunner(this.task, this.processManager) { + final String successResponse = json.encode( + const { + 'result': 'success', + }, + ); + registerExtension('ext.cocoonRunTask', (String method, Map parameters) async { final Duration? taskTimeout = parameters.containsKey('timeoutInMinutes') @@ -82,11 +88,25 @@ class _TaskRunner { localEngine: localEngine, localEngineHost: localEngineHost, ); + const Duration taskResultReceivedTimeout = Duration(seconds: 30); + _taskResultReceivedTimeout = Timer( + taskResultReceivedTimeout, + () { + logger.severe('Task runner did not acknowledge task results in $taskResultReceivedTimeout.'); + _closeKeepAlivePort(); + exitCode = 1; + } + ); return ServiceExtensionResponse.result(json.encode(result.toJson())); }); registerExtension('ext.cocoonRunnerReady', (String method, Map parameters) async { - return ServiceExtensionResponse.result('"ready"'); + return ServiceExtensionResponse.result(successResponse); + }); + registerExtension('ext.cocoonTaskResultReceived', + (String method, Map parameters) async { + _closeKeepAlivePort(); + return ServiceExtensionResponse.result(successResponse); }); } @@ -104,6 +124,7 @@ class _TaskRunner { // TODO(ianh): workaround for https://github.com/dart-lang/sdk/issues/23797 RawReceivePort? _keepAlivePort; Timer? _startTaskTimeout; + Timer? _taskResultReceivedTimeout; bool _taskStarted = false; final Completer _completer = Completer(); @@ -210,7 +231,6 @@ class _TaskRunner { } finally { await checkForRebootRequired(); await forceQuitRunningProcesses(); - _closeKeepAlivePort(); } } @@ -269,6 +289,7 @@ class _TaskRunner { /// Disables the keepalive port, allowing the VM to exit. void _closeKeepAlivePort() { _startTaskTimeout?.cancel(); + _taskResultReceivedTimeout?.cancel(); _keepAlivePort?.close(); } diff --git a/dev/devicelab/lib/framework/runner.dart b/dev/devicelab/lib/framework/runner.dart index 8e3ec26c016..e05bda419f3 100644 --- a/dev/devicelab/lib/framework/runner.dart +++ b/dev/devicelab/lib/framework/runner.dart @@ -8,6 +8,7 @@ import 'dart:io'; import 'package:meta/meta.dart'; import 'package:vm_service/vm_service.dart'; +import 'package:vm_service/vm_service_io.dart'; import 'cocoon.dart'; import 'devices.dart'; @@ -237,11 +238,19 @@ Future runTask( print('[$taskName] Connected to VM server.'); isolateParams = isolateParams == null ? {} : Map.of(isolateParams); isolateParams['runProcessCleanup'] = terminateStrayDartProcesses.toString(); - final Map taskResultJson = (await result.vmService.callServiceExtension( + final VmService service = result.vmService; + final String isolateId = result.isolate.id!; + final Map taskResultJson = (await service.callServiceExtension( 'ext.cocoonRunTask', args: isolateParams, - isolateId: result.isolate.id, + isolateId: isolateId, )).json!; + // Notify the task process that the task result has been received and it + // can proceed to shutdown. + await _acknowledgeTaskResultReceived( + service: service, + isolateId: isolateId, + ); final TaskResult taskResult = TaskResult.fromJson(taskResultJson); final int exitCode = await runner.exitCode; print('[$taskName] Process terminated with exit code $exitCode.'); @@ -270,9 +279,6 @@ Future _connectToRunnerIsolate(Uri vmServiceUri) async { while (true) { try { - // Make sure VM server is up by successfully opening and closing a socket. - await (await WebSocket.connect(url)).close(); - // Look up the isolate. final VmService client = await vmServiceConnectUri(url); VM vm = await client.getVM(); @@ -281,8 +287,9 @@ Future _connectToRunnerIsolate(Uri vmServiceUri) async { vm = await client.getVM(); } final IsolateRef isolate = vm.isolates!.first; + // Sanity check to ensure we're talking with the main isolate. final Response response = await client.callServiceExtension('ext.cocoonRunnerReady', isolateId: isolate.id); - if (response.json!['response'] != 'ready') { + if (response.json!['result'] != 'success') { throw 'not ready yet'; } return ConnectionResult(client, isolate); @@ -301,37 +308,23 @@ Future _connectToRunnerIsolate(Uri vmServiceUri) async { } } +Future _acknowledgeTaskResultReceived({ + required VmService service, + required String isolateId, + }) async { + try { + await service.callServiceExtension( + 'ext.cocoonTaskResultReceived', + isolateId: isolateId, + ); + } on RPCError { + // The target VM may shutdown before the response is received. + } +} + class ConnectionResult { ConnectionResult(this.vmService, this.isolate); final VmService vmService; final IsolateRef isolate; } - -/// The cocoon client sends an invalid VM service response, we need to intercept it. -Future vmServiceConnectUri(String wsUri, {Log? log}) async { - final WebSocket socket = await WebSocket.connect(wsUri); - final StreamController controller = StreamController(); - final Completer streamClosedCompleter = Completer(); - socket.listen( - (dynamic data) { - final Map rawData = json.decode(data as String) as Map ; - if (rawData['result'] == 'ready') { - rawData['result'] = {'response': 'ready'}; - controller.add(json.encode(rawData)); - } else { - controller.add(data); - } - }, - onError: (Object err, StackTrace stackTrace) => controller.addError(err, stackTrace), - onDone: () => streamClosedCompleter.complete(), - ); - - return VmService( - controller.stream, - (String message) => socket.add(message), - log: log, - disposeHandler: () => socket.close(), - streamClosed: streamClosedCompleter.future, - ); -} diff --git a/dev/devicelab/test/runner_test.dart b/dev/devicelab/test/runner_test.dart index e8684321489..9ec97c4d9ec 100644 --- a/dev/devicelab/test/runner_test.dart +++ b/dev/devicelab/test/runner_test.dart @@ -3,6 +3,7 @@ // found in the LICENSE file. import 'package:flutter_devicelab/framework/runner.dart'; +import 'package:vm_service/vm_service.dart'; import 'common.dart'; @@ -40,5 +41,33 @@ void main() { expect(printLog[0], 'Consistently failed across all 3 executions.'); expect(printLog[1], 'flaky: false'); }); + + test('Ensures task results are received before task process shuts down.', () async { + // Regression test for https://github.com/flutter/flutter/issues/155475 + // + // Runs multiple concurrent instances of a short-lived task in an effort to + // trigger the race between the VM service processing the response from + // ext.cocoonRunTask and the VM shutting down, which will throw a RPCError + // with a "Service connection disposed" message. + // + // Obviously this isn't foolproof, but this test becoming flaky or failing + // consistently should signal that we're encountering a shutdown race + // somewhere. + const int runs = 30; + try { + await Future.wait( + >[ + for (int i = 0; i < runs; ++i) + runTasks( + ['smoke_test_success'], + isolateParams: isolateParams, + ), + ], + eagerError: true, + ); + } on RPCError catch (e) { + fail('Unexpected RPCError: $e'); + } + }, timeout: const Timeout.factor(2)); }); }