channel
rust supports two types of channels, "local" channels (same isolate) and "isolate" channels (different isolates).
Local Channels
channel
is used for communication between produces and consumers on the same isolate. channel
is
similar to StreamController
except it buffers data until read and will never throw.
In more detail, channel
returns a Sender
and Receiver
. Each item T
sent by the Sender
will only be seen once by the Receiver
. Even if the Sender
calls close
while the Receiver
's buffer
is not empty, the Receiver
will still yield the remaining items in the buffer until empty.
Examples
Single Sender, Single Receiver
In this example, a single sender sends data to a single receiver. The receiver retrieves the data and processes it.
import 'package:rust/sync.dart';
void main() async {
final (tx, rx) = channel<int>();
// Sender sends data
tx.send(1);
tx.send(2);
tx.send(3);
// Receiver retrieves data
for (int i = 0; i < 3; i++) {
print(await rx.recv()); // Outputs: 1, 2, 3
}
}
Receiver with Timeout
This example shows how to handle timeouts when receiving data from a channel.
import 'package:rust/sync.dart';
void main() async {
final (tx, rx) = channel<int>();
// Sender sends data
tx.send(1);
// Receiver retrieves data with a timeout
final result = await rx.recvTimeout(Duration(milliseconds: 100));
if (result.isOk()) {
print(result.unwrap()); // Outputs: 1
} else {
print("Timeout"); // If timeout occurs
}
}
Receiver with Error Handling
In this example, we see how to handle errors that might occur while receiving data from a channel.
import 'package:rust/sync.dart';
void main() async {
final (tx, rx) = channel<int>();
// Sender sends data and then an error
tx.send(1);
tx.send(2);
tx.sendError(Exception("Test error"));
// Receiver retrieves data and handles errors
for (int i = 0; i < 3; i++) {
final result = await rx.recv();
if (result.isOk()) {
print(result.unwrap()); // Outputs: 1, 2
} else {
print("Error: ${result.unwrapErr()}"); // Handles error
}
}
}
Iterating Over Received Data
This example demonstrates how to iterate over the received data using the iter method.
import 'package:rust/sync.dart';
void main() async {
final (tx, rx) = channel<int>();
// Sender sends data
tx.send(1);
tx.send(2);
tx.send(3);
// Receiver iterates over the data
final iterator = rx.iter();
for (final value in iterator) {
print(value); // Outputs: 1, 2, 3
}
}
Using Receiver as a Stream
In this example, we see how to use the receiver as a stream, allowing for asynchronous data handling.
import 'package:rust/sync.dart';
void main() async {
final (tx, rx) = channel<int>();
// Sender sends data
tx.send(1);
tx.send(2);
tx.send(3);
// Close the sender after some delay
() async {
await Future.delayed(Duration(seconds: 1));
tx.close();
}();
// Receiver processes the stream of data
await for (final value in rx.stream()) {
print(value); // Outputs: 1, 2, 3
}
}
Isolate Channels
isolateChannel
is used for bi-directional isolate communication. The returned
Sender
and Receiver
can communicate with the spawned isolate and
the spawned isolate is passed a Sender
and Receiver
to communicate with the original isolate.
Each item T
sent by the Sender
will only be seen once by the Receiver
. Even if the Sender
calls close
while the Receiver
's buffer
is not empty, the Receiver
will still yield the remaining items in the buffer until empty.
Types that can be sent over a SendPort
, as defined here,
are allow to be sent between isolates. Otherwise a toIsolateCodec
and/or a fromIsolateCodec
can be passed
to encode and decode the messages.
Note: Dart does not support isolates on web. Therefore, if your compilation target is web, you cannot use
isolateChannel
.
Examples
Simple String Communication
This example demonstrates a simple string message being sent and received between the main isolate and a spawned isolate.
void main() async {
final (tx1, rx1) = await isolateChannel<String, String>((tx2, rx2) async {
assert((await rx2.recv()).unwrap() == "hello");
tx2.send("hi");
}, toIsolateCodec: const StringCodec(), fromIsolateCodec: const StringCodec());
tx1.send("hello");
expect((await rx1.recv()).unwrap(), "hi");
}
Different Codecs for Communication
This example demonstrates using different codecs for communication between the main isolate and a spawned isolate.
void main() async {
final (tx1, rx1) = await isolateChannel<String, int>((tx2, rx2) async {
assert((await rx2.recv()).unwrap() == "hello");
tx2.send(1);
}, toIsolateCodec: const StringCodec(), fromIsolateCodec: const IntCodec());
tx1.send("hello");
expect((await rx1.recv()).unwrap(), 1);
}
No Codecs
This example demonstrates communication without specifying codecs, relying on the default codecs.
void main() async {
final (tx1, rx1) = await isolateChannel<String, int>((tx2, rx2) async {
assert((await rx2.recv()).unwrap() == "hello");
tx2.send(1);
});
tx1.send("hello");
expect((await rx1.recv()).unwrap(), 1);
}
Bi-directional Send and Receive
This example demonstrates a more complex scenario where multiple messages are sent and received in both directions.
void main() async {
final (tx1, rx1) = await isolateChannel<int, int>((tx2, rx2) async {
await Future.delayed(Duration(milliseconds: 100));
tx2.send((await rx2.recv()).unwrap() * 10);
await Future.delayed(Duration(milliseconds: 100));
tx2.send((await rx2.recv()).unwrap() * 10);
await Future.delayed(Duration(milliseconds: 100));
tx2.send(6);
await Future.delayed(Duration(milliseconds: 100));
tx2.send((await rx2.recv()).unwrap() * 10);
await Future.delayed(Duration(milliseconds: 100));
tx2.send((await rx2.recv()).unwrap() * 10);
await Future.delayed(Duration(milliseconds: 100));
tx2.send(7);
await Future.delayed(Duration(milliseconds: 100));
tx2.send((await rx2.recv()).unwrap() * 10);
}, toIsolateCodec: const IntCodec(), fromIsolateCodec: const IntCodec());
tx1.send(1);
expect(await rx1.recv().unwrap(), 10);
tx1.send(2);
expect(await rx1.recv().unwrap(), 20);
tx1.send(3);
expect(await rx1.recv().unwrap(), 6);
tx1.send(4);
expect(await rx1.recv().unwrap(), 30);
tx1.send(5);
expect(await rx1.recv().unwrap(), 40);
expect(await rx1.recv().unwrap(), 7);
expect(await rx1.recv().unwrap(), 50);
}