mirror of
https://github.com/flutter/flutter.git
synced 2025-06-03 00:51:18 +00:00
99 lines
3.0 KiB
Dart
99 lines
3.0 KiB
Dart
// Copyright 2014 The Flutter Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file.
|
|
|
|
import 'dart:async';
|
|
import 'dart:collection';
|
|
|
|
import '../globals.dart' as globals;
|
|
|
|
/// A closure type used by the [TaskQueue].
|
|
typedef TaskQueueClosure<T> = Future<T> Function();
|
|
|
|
/// A task queue of Futures to be completed in parallel, throttling
|
|
/// the number of simultaneous tasks.
|
|
///
|
|
/// The tasks return results of type T.
|
|
class TaskQueue<T> {
|
|
/// Creates a task queue with a maximum number of simultaneous jobs.
|
|
/// The [maxJobs] parameter defaults to the number of CPU cores on the
|
|
/// system.
|
|
TaskQueue({int? maxJobs})
|
|
: maxJobs = maxJobs ?? globals.platform.numberOfProcessors;
|
|
|
|
/// The maximum number of jobs that this queue will run simultaneously.
|
|
final int maxJobs;
|
|
|
|
final Queue<_TaskQueueItem<T>> _pendingTasks = Queue<_TaskQueueItem<T>>();
|
|
final Set<_TaskQueueItem<T>> _activeTasks = <_TaskQueueItem<T>>{};
|
|
final Set<Completer<void>> _completeListeners = <Completer<void>>{};
|
|
|
|
/// Returns a future that completes when all tasks in the [TaskQueue] are
|
|
/// complete.
|
|
Future<void> get tasksComplete {
|
|
// In case this is called when there are no tasks, we want it to
|
|
// signal complete immediately.
|
|
if (_activeTasks.isEmpty && _pendingTasks.isEmpty) {
|
|
return Future<void>.value();
|
|
}
|
|
final Completer<void> completer = Completer<void>();
|
|
_completeListeners.add(completer);
|
|
return completer.future;
|
|
}
|
|
|
|
/// Adds a single closure to the task queue, returning a future that
|
|
/// completes when the task completes.
|
|
Future<T> add(TaskQueueClosure<T> task) {
|
|
final Completer<T> completer = Completer<T>();
|
|
_pendingTasks.add(_TaskQueueItem<T>(task, completer));
|
|
if (_activeTasks.length < maxJobs) {
|
|
_processTask();
|
|
}
|
|
return completer.future;
|
|
}
|
|
|
|
// Process a single task.
|
|
void _processTask() {
|
|
if (_pendingTasks.isNotEmpty && _activeTasks.length <= maxJobs) {
|
|
final _TaskQueueItem<T> item = _pendingTasks.removeFirst();
|
|
_activeTasks.add(item);
|
|
item.onComplete = () {
|
|
_activeTasks.remove(item);
|
|
_processTask();
|
|
};
|
|
item.run();
|
|
} else {
|
|
_checkForCompletion();
|
|
}
|
|
}
|
|
|
|
void _checkForCompletion() {
|
|
if (_activeTasks.isEmpty && _pendingTasks.isEmpty) {
|
|
for (final Completer<void> completer in _completeListeners) {
|
|
if (!completer.isCompleted) {
|
|
completer.complete();
|
|
}
|
|
}
|
|
_completeListeners.clear();
|
|
}
|
|
}
|
|
}
|
|
|
|
class _TaskQueueItem<T> {
|
|
_TaskQueueItem(this._closure, this._completer, {this.onComplete});
|
|
|
|
final TaskQueueClosure<T> _closure;
|
|
final Completer<T> _completer;
|
|
void Function()? onComplete;
|
|
|
|
Future<void> run() async {
|
|
try {
|
|
_completer.complete(await _closure());
|
|
} catch (e) { // ignore: avoid_catches_without_on_clauses, forwards to Future
|
|
_completer.completeError(e);
|
|
} finally {
|
|
onComplete?.call();
|
|
}
|
|
}
|
|
}
|