title: Dart 异步编程date: 2021-07-15 19:52:57.588
updated: 2021-07-15 23:29:50.304
url: /?p=302
categories: Dart
tags:
异步编程
isolate机制
isolate的中文翻译是孤立、使…分离。
Dart是基于单线程模型的语言。但是在开发当中我们经常会进行耗时操作比如网络请求,这种耗时操作会堵塞我们的代码,所以在Dart也有并发机制,名叫isolate。APP的启动入口main
函数就是一个类似Android主线程的一个主isolate。和Java的Thread不同的是,Dart中的isolate无法共享内存。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| import 'dart:isolate';
int i;
void main() { i = 10; ReceivePort receivePort = new ReceivePort(); Isolate.spawn(isolateMain, receivePort.sendPort);
receivePort.listen((message) { if (message is SendPort) { message.send("好呀好呀!"); } else { print("接到子isolate消息:" + message); } }); }
void isolateMain(SendPort sendPort) { print(i);
ReceivePort receivePort = new ReceivePort(); sendPort.send(receivePort.sendPort);
sendPort.send("去大保健吗?");
receivePort.listen((message) { print("接到主isolate消息:" + message); }); }
|
注意:isolate是并行执行的,当做线程就可以了。与事件队列的关系的话,具体个人没有考究,我姑且把它当做每个isolate都存在自己的事件列队。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| import 'dart:io'; import 'dart:isolate';
void main() {
Isolate.spawn(i1, ""); Isolate.spawn(i2, "");
while(true){} }
void i1( msg) { print("isolate1 执行");
Future.doWhile((){ print("isolate1 future"); return true; }); }
void i2( msg) { print("isolate2 执行"); Future.doWhile((){ print("isolate2 future"); return true; });
}
|
event-loop
可以看到代码中,我们接收消息使用了`listene`函数来监听消息。假设我们现在在main方法最后加入`sleep`休眠,会不会影响`listene`回调的时机?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| import 'dart:io'; import 'dart:isolate';
int i;
void main() { i = 10; ReceivePort receivePort = new ReceivePort(); Isolate.spawn(isolateMain, receivePort.sendPort);
receivePort.listen((message) { if (message is SendPort) { message.send("好呀好呀!"); } else { print("接到子isolate消息:" + message); } });
sleep(Duration(seconds: 2)); print("休眠完成"); }
void isolateMain(SendPort sendPort) { print(i);
ReceivePort receivePort = new ReceivePort(); sendPort.send(receivePort.sendPort); sendPort.send("去大保健吗?");
receivePort.listen((message) { print("接到主isolate消息:" + message); }); }
|
结果是大概2s后,我们的listene
才打印出其他isolate发过来的消息。同Android Handler类似,在Dart运行环境中也是靠事件驱动的,通过event loop不停的从队列中获取消息或者事件来驱动整个应用的运行,isolate发过来的消息就是通过loop处理。但是不同的是在Android中每个线程只有一个Looper所对应的MessageQueue,而Dart中有两个队列,一个叫做**event queue(事件队列),另一个叫做microtask queue(微任务队列)**。
Dart在执行完main函数后,就会由Loop开始执行两个任务队列中的Event。首先Loop检查微服务队列,依次执行Event,当微服务队列执行完后,就检查Event queue队列依次执行,在执行Event queue的过程中,每执行完一个Event就再检查一次微服务队列。所以微服务队列优先级高,可以利用微服务进行插队。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| import 'dart:io'; import 'dart:isolate';
void main(){ var receivePort = new ReceivePort();
receivePort.listen((t){ print(t); Future.microtask((){ print("微任务执行1"); }); });
receivePort.sendPort.send("发送消息给消息接收器1!");
receivePort.sendPort.send("发送消息给消息接收器2!");
receivePort.sendPort.send("发送消息给消息接收器3!");
sleep(Duration(seconds: 10)); }
|
我们先来看个例子:
1 2 3 4 5 6 7 8 9
| import 'dart:io';
void main(){ new File("/Users/enjoy/a.txt").readAsString().then((content){ print(content); }); while(true){} }
|
文件内容永远也无法打印出来,因为main函数还没执行完。而then方法是由Loop检查Event queue执行的。
如果需要往微服务中插入Event进行插队:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| import 'dart:async'; import 'dart:io';
void main(){ new File("/Users/enjoy/a.txt").readAsString().then((content){ print(content); }); Future.microtask((){ print("future: excute microtask");
});
}
|
Future
在 Dart 库中随处可见 Future 对象,通常异步函数返回的对象就是一个 Future。 当一个 future _执行完后_,他里面的值 就可以使用了,可以使用 then()
来在 future 完成的时候执行其他代码。Future对象其实就代表了在事件队列中的一个事件的结果。
异常
1 2 3 4 5 6 7
|
new File("/Users/enjoy/a1.txt").readAsString().then((content) { print(content); }).catchError((e, s) { print(s); });
|
组合
then()
的返回值同样是一个future对象,可以利用队列的原理进行组合异步任务
1 2 3 4 5 6 7 8
| new File("/Users/enjoy/a.txt").readAsString().then((content) { print(content); return 1; }).then((i){ print(i); }).catchError((e, s) { print(s); });
|
上面的方式是等待执行完成读取文件之后,再执行一个新的future。如果我们需要等待一组任务都执行完成再统一处理一些事情,可以通过wait()
完成。
1 2 3 4 5 6 7 8
| Future readDone = new File("/Users/enjoy/a.txt").readAsString(); Future delayedDone = Future.delayed(Duration(seconds: 3));
Future.wait([readDone, delayedDone]).then((values) { print(values[0]); print(values[1]); });
|
注意使用Future.delayed(Duration(seconds: 3));
延迟3秒并不是3秒后就一定执行,我们已经了解过了前面的事件队列和微任务队列,只有前面的事件或微任务完成后,我们才能开始计时,所以延迟时间是大于3秒的。
Stream
Stream(流) 在 Dart API 中也经常出现,表示发出的一系列的异步数据。 Stream 是一个异步数据源,它是 Dart 中处理异步事件流的统一 API。
Future 表示稍后获得的一个数据,所有异步的操作的返回值都用 Future 来表示。但是 Future 只能表示一次异步获得的数据。而 Stream 表示多次异步获得的数据。比如 IO 处理的时候,每次只会读取一部分数据和一次性读取整个文件的内容相比,Stream 的好处是处理过程中内存占用较小。而 File 的 `readAsString()`是一次性读取整个文件的内容进来,虽然获得完整内容处理起来比较方便,但是如果文件很大的话就会导致内存占用过大的问题。
1 2 3 4 5 6 7
| new File("/Users/enjoy/app-release.apk").openRead().listen((List<int> bytes) { print("stream执行"); });
new File("/Users/enjoy/app-release.apk").readAsBytes().then((_){ print("future执行"); });
|
listen()
其实就是订阅这个Stream,它会返回一个StreamSubscription
订阅者。订阅者肯定就提供了取消订阅的cancel()
,去掉后我们的listen中就接不到任何信息了。除了cancel()
取消方法之外,我们还可以使用onData()
重置listene方法,onDone
监听完成等等操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| StreamSubscription<List<int>> listen = new File("/Users/enjoy/app-release.apk").openRead().listen((List<int> bytes) { print("stream执行"); }); listen.onData((_){ print("替代listene"); }); listen.onDone((){ print("结束"); }); listen.onError((e,s){ print("异常"); });
listen.pause();
listen.resume();
|
广播模式
Stream有两种订阅模式:单订阅和多订阅。单订阅就是只能有一个订阅者,上面的使用我们都是单订阅模式,而广播是可以有多个订阅者。通过 Stream.asBroadcastStream() 可以将一个单订阅模式的 Stream 转换成一个多订阅模式的 Stream,isBroadcast 属性可以判断当前 Stream 所处的模式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| var stream = new File("/Users/enjoy/app-release.apk").openRead(); stream.listen((List<int> bytes) { });
var broadcastStream = new File("/Users/enjoy/app-release.apk").openRead().asBroadcastStream(); broadcastStream.listen((_){ print("订阅者1"); }); broadcastStream.listen((_){ print("订阅者2"); });
|
需要注意的是,多订阅模式如果没有及时添加订阅者则可能丢数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| var stream = Stream.fromIterable([1, 2, 3]); new Timer(new Duration(seconds: 3), () => stream.listen(print));
var streamController = StreamController.broadcast(); streamController.add(1); streamController.stream.listen((i){ print("broadcast:$i"); }); streamController.close();
var broadcastStream = Stream.fromIterable([1, 2, 3]).asBroadcastStream(); new Timer(new Duration(seconds: 3), () => broadcastStream.listen(print));
|
在广播发送之后,我们再订阅,并且能够收到通知的,这种模式的广播,我们一般称它为粘性广播。
async/await
async/await的作用无非就是异步代码同步化。
使用async
和await
的代码是异步的,但是看起来很像同步代码。当我们需要获得A的结果,再执行B,时,你需要then()->then()
,但是利用async
与await
能够非常好的解决回调地狱的问题:
1 2 3 4 5 6 7 8 9
|
Future<String> readFile() async { String content = await new File("/Users/xiang/enjoy/a.txt").readAsString(); String content2 = await new File("/Users/xiang/enjoy/a.txt").readAsString(); return content; }
|