From c8b38b20bfaf5c28e7136c574e87e3ad84b9c00d Mon Sep 17 00:00:00 2001 From: Lau Ching Jun Date: Mon, 10 Apr 2023 16:20:01 -0700 Subject: [PATCH] Debounce the proxied connection over proxied devices. (#124540) Debounce the proxied connection over proxied devices. --- .../lib/src/commands/daemon.dart | 3 +- .../proxied_devices/debounce_data_stream.dart | 63 +++++++ .../lib/src/proxied_devices/devices.dart | 3 +- .../debounce_data_stream_test.dart | 174 ++++++++++++++++++ 4 files changed, 241 insertions(+), 2 deletions(-) create mode 100644 packages/flutter_tools/lib/src/proxied_devices/debounce_data_stream.dart create mode 100644 packages/flutter_tools/test/general.shard/proxied_devices/debounce_data_stream_test.dart diff --git a/packages/flutter_tools/lib/src/commands/daemon.dart b/packages/flutter_tools/lib/src/commands/daemon.dart index e9f03c5c279..9e90e8f2180 100644 --- a/packages/flutter_tools/lib/src/commands/daemon.dart +++ b/packages/flutter_tools/lib/src/commands/daemon.dart @@ -25,6 +25,7 @@ import '../emulator.dart'; import '../features.dart'; import '../globals.dart' as globals; import '../project.dart'; +import '../proxied_devices/debounce_data_stream.dart'; import '../proxied_devices/file_transfer.dart'; import '../resident_runner.dart'; import '../run_cold.dart'; @@ -1463,7 +1464,7 @@ class ProxyDomain extends Domain { } _forwardedConnections[id] = socket; - socket.listen((List data) { + debounceDataStream(socket).listen((List data) { sendEvent('proxy.data.$id', null, data); }, onError: (Object error, StackTrace stackTrace) { // Socket error, probably disconnected. diff --git a/packages/flutter_tools/lib/src/proxied_devices/debounce_data_stream.dart b/packages/flutter_tools/lib/src/proxied_devices/debounce_data_stream.dart new file mode 100644 index 00000000000..15b5649723e --- /dev/null +++ b/packages/flutter_tools/lib/src/proxied_devices/debounce_data_stream.dart @@ -0,0 +1,63 @@ +// Copyright 2014 The Flutter Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +import 'dart:async'; +import 'dart:typed_data'; + +/// Merges the values in a stream that are sent less than [duration] apart. +/// +/// To minimize latency, the merged stream will always emit the first value that +/// is sent after a pause of at least [duration] long. After the first message, +/// all values that are sent within [duration] will be merged into one. +Stream debounceDataStream(Stream stream, [Duration duration = const Duration(milliseconds: 100)]) { + final StreamController controller = StreamController(); + final BytesBuilder buffer = BytesBuilder(copy: false); + + bool isDone = false; + Timer? timer; + + // Called when timer triggers, sends out the buffered messages. + void onTimer() { + if (buffer.isNotEmpty) { + controller.add(buffer.toBytes()); + buffer.clear(); + if (isDone) { + controller.close(); + } else { + // Start another timer even if we have nothing to send right now, so + // that outgoing messages are at least [duration] apart. + timer = Timer(duration, onTimer); + } + } else { + timer = null; + } + } + + controller.onListen = () { + final StreamSubscription subscription = stream.listen((Uint8List data) { + if (timer == null) { + controller.add(data); + // Start the timer to make sure that the next message is at least [duration] apart. + timer = Timer(duration, onTimer); + } else { + buffer.add(data); + } + }, onError: (Object error, StackTrace stackTrace) { + // Forward the error. + controller.addError(error, stackTrace); + }, onDone: () { + isDone = true; + // Delay closing the channel if we still have buffered data. + if (timer == null) { + controller.close(); + } + }); + + controller.onCancel = () { + subscription.cancel(); + }; + }; + + return controller.stream; +} diff --git a/packages/flutter_tools/lib/src/proxied_devices/devices.dart b/packages/flutter_tools/lib/src/proxied_devices/devices.dart index 8a71eedf04f..8d9be59ab4c 100644 --- a/packages/flutter_tools/lib/src/proxied_devices/devices.dart +++ b/packages/flutter_tools/lib/src/proxied_devices/devices.dart @@ -18,6 +18,7 @@ import '../daemon.dart'; import '../device.dart'; import '../device_port_forwarder.dart'; import '../project.dart'; +import 'debounce_data_stream.dart'; import 'file_transfer.dart'; bool _isNullable() => null is T; @@ -563,7 +564,7 @@ class ProxiedPortForwarder extends DevicePortForwarder { // Do nothing here. }, )); - socket.listen((Uint8List data) { + debounceDataStream(socket).listen((Uint8List data) { unawaited(connection.sendRequest('proxy.write', { 'id': id, }, data).then( diff --git a/packages/flutter_tools/test/general.shard/proxied_devices/debounce_data_stream_test.dart b/packages/flutter_tools/test/general.shard/proxied_devices/debounce_data_stream_test.dart new file mode 100644 index 00000000000..b0724ed0cee --- /dev/null +++ b/packages/flutter_tools/test/general.shard/proxied_devices/debounce_data_stream_test.dart @@ -0,0 +1,174 @@ +// Copyright 2014 The Flutter Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +import 'dart:async'; +import 'dart:typed_data'; + +import 'package:fake_async/fake_async.dart'; +import 'package:flutter_tools/src/proxied_devices/debounce_data_stream.dart'; + +import '../../src/common.dart'; + +void main() { + group('debounceDataStreams', () { + late FakeAsync fakeAsync; + late StreamController source; + late Stream output; + const Duration debounceDuration = Duration(seconds: 10); + const Duration smallDuration = Duration(milliseconds: 10); + + void addToSource(int value) { + source.add(Uint8List.fromList([value])); + } + + setUp(() { + fakeAsync = FakeAsync(); + fakeAsync.run((FakeAsync time) { + source = StreamController(); + output = debounceDataStream(source.stream, debounceDuration); + }); + }); + + testWithoutContext('does not listen if returned stream is not listened to', () { + expect(source.hasListener, false); + output.listen(dummy); + expect(source.hasListener, true); + }); + + testWithoutContext('forwards data normally is all data if longer than duration apart', () { + fakeAsync.run((FakeAsync time) { + final List outputItems = []; + output.listen(outputItems.add); + + addToSource(1); + time.elapse(debounceDuration + smallDuration); + addToSource(2); + time.elapse(debounceDuration + smallDuration); + addToSource(3); + time.elapse(debounceDuration + smallDuration); + + expect(outputItems, >[ + [1], + [2], + [3], + ]); + }); + }); + + testWithoutContext('merge data after the first if sent within duration', () { + fakeAsync.run((FakeAsync time) { + final List outputItems = []; + output.listen(outputItems.add); + + addToSource(1); + time.elapse(smallDuration); + addToSource(2); + time.elapse(smallDuration); + addToSource(3); + time.elapse(debounceDuration + smallDuration); + + expect(outputItems, >[ + [1], + [2, 3], + ]); + }); + }); + + testWithoutContext('output data in separate chunks if time between them is longer than duration', () { + fakeAsync.run((FakeAsync time) { + final List outputItems = []; + output.listen(outputItems.add); + + addToSource(1); + time.elapse(smallDuration); + addToSource(2); + time.elapse(smallDuration); + addToSource(3); + time.elapse(debounceDuration + smallDuration); + addToSource(4); + time.elapse(smallDuration); + addToSource(5); + time.elapse(debounceDuration + smallDuration); + + expect(outputItems, >[ + [1], + [2, 3], + [4, 5], + ]); + }); + }); + + testWithoutContext('sends the last chunk after debounce duration', () { + fakeAsync.run((FakeAsync time) { + final List outputItems = []; + output.listen(outputItems.add); + + addToSource(1); + time.flushMicrotasks(); + expect(outputItems, >[[1]]); + + time.elapse(smallDuration); + addToSource(2); + time.elapse(smallDuration); + addToSource(3); + expect(outputItems, >[[1]]); + + time.elapse(debounceDuration + smallDuration); + expect(outputItems, >[ + [1], + [2, 3], + ]); + }); + }); + + testWithoutContext('close if source stream is closed', () { + fakeAsync.run((FakeAsync time) { + bool isDone = false; + output.listen(dummy, onDone: () => isDone = true); + expect(isDone, false); + source.close(); + time.flushMicrotasks(); + expect(isDone, true); + }); + }); + + testWithoutContext('delay close until after last chunk is sent', () { + fakeAsync.run((FakeAsync time) { + final List outputItems = []; + bool isDone = false; + output.listen(outputItems.add, onDone: () => isDone = true); + + addToSource(1); + time.flushMicrotasks(); + expect(outputItems, >[[1]]); + + addToSource(2); + source.close(); + time.elapse(smallDuration); + expect(isDone, false); + expect(outputItems, >[[1]]); + + time.elapse(debounceDuration + smallDuration); + expect(outputItems, >[ + [1], + [2], + ]); + expect(isDone, true); + }); + }); + + testWithoutContext('close if returned stream is closed', () { + fakeAsync.run((FakeAsync time) { + bool isCancelled = false; + source.onCancel = () => isCancelled = true; + final StreamSubscription subscription = output.listen(dummy); + expect(isCancelled, false); + subscription.cancel(); + expect(isCancelled, true); + }); + }); + }); +} + +Uint8List dummy(Uint8List data) => data;