mirror of
https://github.com/flutter/flutter.git
synced 2025-06-03 00:51:18 +00:00
[ Cocoon ] Wait for task results to be received by the task runner before shutting down the task process (#156002)
Prior to this fix, `_TaskRunner.run` would immediately cleanup the keep-alive port once the task completed, which would result in the isolate shutting down as soon as the task result was returned from `ext.cocoonRunTask` callback in the form of a `ServiceExtensionResponse`. Since the service extension response is processed by the service isolate, it was possible for the VM to start shutting down before the service isolate could send the task result data back to the task runner. This change introduces a new service extension, `ext.cocoonTaskResultReceived`, that the task runner invokes after it receives the task result from `ext.cocoonRunTask`, notifying the task process that it can close the keep-alive port and shutdown. Fixes https://github.com/flutter/flutter/issues/155475
This commit is contained in:
parent
d877d2875e
commit
2c84e63ba7
@ -66,6 +66,12 @@ Future<TaskResult> task(TaskFunction task, { ProcessManager? processManager }) a
|
||||
|
||||
class _TaskRunner {
|
||||
_TaskRunner(this.task, this.processManager) {
|
||||
final String successResponse = json.encode(
|
||||
const <String, String>{
|
||||
'result': 'success',
|
||||
},
|
||||
);
|
||||
|
||||
registerExtension('ext.cocoonRunTask',
|
||||
(String method, Map<String, String> parameters) async {
|
||||
final Duration? taskTimeout = parameters.containsKey('timeoutInMinutes')
|
||||
@ -82,11 +88,25 @@ class _TaskRunner {
|
||||
localEngine: localEngine,
|
||||
localEngineHost: localEngineHost,
|
||||
);
|
||||
const Duration taskResultReceivedTimeout = Duration(seconds: 30);
|
||||
_taskResultReceivedTimeout = Timer(
|
||||
taskResultReceivedTimeout,
|
||||
() {
|
||||
logger.severe('Task runner did not acknowledge task results in $taskResultReceivedTimeout.');
|
||||
_closeKeepAlivePort();
|
||||
exitCode = 1;
|
||||
}
|
||||
);
|
||||
return ServiceExtensionResponse.result(json.encode(result.toJson()));
|
||||
});
|
||||
registerExtension('ext.cocoonRunnerReady',
|
||||
(String method, Map<String, String> parameters) async {
|
||||
return ServiceExtensionResponse.result('"ready"');
|
||||
return ServiceExtensionResponse.result(successResponse);
|
||||
});
|
||||
registerExtension('ext.cocoonTaskResultReceived',
|
||||
(String method, Map<String, String> parameters) async {
|
||||
_closeKeepAlivePort();
|
||||
return ServiceExtensionResponse.result(successResponse);
|
||||
});
|
||||
}
|
||||
|
||||
@ -104,6 +124,7 @@ class _TaskRunner {
|
||||
// TODO(ianh): workaround for https://github.com/dart-lang/sdk/issues/23797
|
||||
RawReceivePort? _keepAlivePort;
|
||||
Timer? _startTaskTimeout;
|
||||
Timer? _taskResultReceivedTimeout;
|
||||
bool _taskStarted = false;
|
||||
|
||||
final Completer<TaskResult> _completer = Completer<TaskResult>();
|
||||
@ -210,7 +231,6 @@ class _TaskRunner {
|
||||
} finally {
|
||||
await checkForRebootRequired();
|
||||
await forceQuitRunningProcesses();
|
||||
_closeKeepAlivePort();
|
||||
}
|
||||
}
|
||||
|
||||
@ -269,6 +289,7 @@ class _TaskRunner {
|
||||
/// Disables the keepalive port, allowing the VM to exit.
|
||||
void _closeKeepAlivePort() {
|
||||
_startTaskTimeout?.cancel();
|
||||
_taskResultReceivedTimeout?.cancel();
|
||||
_keepAlivePort?.close();
|
||||
}
|
||||
|
||||
|
@ -8,6 +8,7 @@ import 'dart:io';
|
||||
|
||||
import 'package:meta/meta.dart';
|
||||
import 'package:vm_service/vm_service.dart';
|
||||
import 'package:vm_service/vm_service_io.dart';
|
||||
|
||||
import 'cocoon.dart';
|
||||
import 'devices.dart';
|
||||
@ -237,11 +238,19 @@ Future<TaskResult> runTask(
|
||||
print('[$taskName] Connected to VM server.');
|
||||
isolateParams = isolateParams == null ? <String, String>{} : Map<String, String>.of(isolateParams);
|
||||
isolateParams['runProcessCleanup'] = terminateStrayDartProcesses.toString();
|
||||
final Map<String, dynamic> taskResultJson = (await result.vmService.callServiceExtension(
|
||||
final VmService service = result.vmService;
|
||||
final String isolateId = result.isolate.id!;
|
||||
final Map<String, dynamic> taskResultJson = (await service.callServiceExtension(
|
||||
'ext.cocoonRunTask',
|
||||
args: isolateParams,
|
||||
isolateId: result.isolate.id,
|
||||
isolateId: isolateId,
|
||||
)).json!;
|
||||
// Notify the task process that the task result has been received and it
|
||||
// can proceed to shutdown.
|
||||
await _acknowledgeTaskResultReceived(
|
||||
service: service,
|
||||
isolateId: isolateId,
|
||||
);
|
||||
final TaskResult taskResult = TaskResult.fromJson(taskResultJson);
|
||||
final int exitCode = await runner.exitCode;
|
||||
print('[$taskName] Process terminated with exit code $exitCode.');
|
||||
@ -270,9 +279,6 @@ Future<ConnectionResult> _connectToRunnerIsolate(Uri vmServiceUri) async {
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
// Make sure VM server is up by successfully opening and closing a socket.
|
||||
await (await WebSocket.connect(url)).close();
|
||||
|
||||
// Look up the isolate.
|
||||
final VmService client = await vmServiceConnectUri(url);
|
||||
VM vm = await client.getVM();
|
||||
@ -281,8 +287,9 @@ Future<ConnectionResult> _connectToRunnerIsolate(Uri vmServiceUri) async {
|
||||
vm = await client.getVM();
|
||||
}
|
||||
final IsolateRef isolate = vm.isolates!.first;
|
||||
// Sanity check to ensure we're talking with the main isolate.
|
||||
final Response response = await client.callServiceExtension('ext.cocoonRunnerReady', isolateId: isolate.id);
|
||||
if (response.json!['response'] != 'ready') {
|
||||
if (response.json!['result'] != 'success') {
|
||||
throw 'not ready yet';
|
||||
}
|
||||
return ConnectionResult(client, isolate);
|
||||
@ -301,37 +308,23 @@ Future<ConnectionResult> _connectToRunnerIsolate(Uri vmServiceUri) async {
|
||||
}
|
||||
}
|
||||
|
||||
Future<void> _acknowledgeTaskResultReceived({
|
||||
required VmService service,
|
||||
required String isolateId,
|
||||
}) async {
|
||||
try {
|
||||
await service.callServiceExtension(
|
||||
'ext.cocoonTaskResultReceived',
|
||||
isolateId: isolateId,
|
||||
);
|
||||
} on RPCError {
|
||||
// The target VM may shutdown before the response is received.
|
||||
}
|
||||
}
|
||||
|
||||
class ConnectionResult {
|
||||
ConnectionResult(this.vmService, this.isolate);
|
||||
|
||||
final VmService vmService;
|
||||
final IsolateRef isolate;
|
||||
}
|
||||
|
||||
/// The cocoon client sends an invalid VM service response, we need to intercept it.
|
||||
Future<VmService> vmServiceConnectUri(String wsUri, {Log? log}) async {
|
||||
final WebSocket socket = await WebSocket.connect(wsUri);
|
||||
final StreamController<dynamic> controller = StreamController<dynamic>();
|
||||
final Completer<dynamic> streamClosedCompleter = Completer<dynamic>();
|
||||
socket.listen(
|
||||
(dynamic data) {
|
||||
final Map<String, dynamic> rawData = json.decode(data as String) as Map<String, dynamic> ;
|
||||
if (rawData['result'] == 'ready') {
|
||||
rawData['result'] = <String, dynamic>{'response': 'ready'};
|
||||
controller.add(json.encode(rawData));
|
||||
} else {
|
||||
controller.add(data);
|
||||
}
|
||||
},
|
||||
onError: (Object err, StackTrace stackTrace) => controller.addError(err, stackTrace),
|
||||
onDone: () => streamClosedCompleter.complete(),
|
||||
);
|
||||
|
||||
return VmService(
|
||||
controller.stream,
|
||||
(String message) => socket.add(message),
|
||||
log: log,
|
||||
disposeHandler: () => socket.close(),
|
||||
streamClosed: streamClosedCompleter.future,
|
||||
);
|
||||
}
|
||||
|
@ -3,6 +3,7 @@
|
||||
// found in the LICENSE file.
|
||||
|
||||
import 'package:flutter_devicelab/framework/runner.dart';
|
||||
import 'package:vm_service/vm_service.dart';
|
||||
|
||||
import 'common.dart';
|
||||
|
||||
@ -40,5 +41,33 @@ void main() {
|
||||
expect(printLog[0], 'Consistently failed across all 3 executions.');
|
||||
expect(printLog[1], 'flaky: false');
|
||||
});
|
||||
|
||||
test('Ensures task results are received before task process shuts down.', () async {
|
||||
// Regression test for https://github.com/flutter/flutter/issues/155475
|
||||
//
|
||||
// Runs multiple concurrent instances of a short-lived task in an effort to
|
||||
// trigger the race between the VM service processing the response from
|
||||
// ext.cocoonRunTask and the VM shutting down, which will throw a RPCError
|
||||
// with a "Service connection disposed" message.
|
||||
//
|
||||
// Obviously this isn't foolproof, but this test becoming flaky or failing
|
||||
// consistently should signal that we're encountering a shutdown race
|
||||
// somewhere.
|
||||
const int runs = 30;
|
||||
try {
|
||||
await Future.wait(
|
||||
<Future<void>>[
|
||||
for (int i = 0; i < runs; ++i)
|
||||
runTasks(
|
||||
<String>['smoke_test_success'],
|
||||
isolateParams: isolateParams,
|
||||
),
|
||||
],
|
||||
eagerError: true,
|
||||
);
|
||||
} on RPCError catch (e) {
|
||||
fail('Unexpected RPCError: $e');
|
||||
}
|
||||
}, timeout: const Timeout.factor(2));
|
||||
});
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user