1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
use {Poll, Async};
use stream::{Stream, Fuse};
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Select<S1, S2> {
stream1: Fuse<S1>,
stream2: Fuse<S2>,
flag: bool,
}
pub fn new<S1, S2>(stream1: S1, stream2: S2) -> Select<S1, S2>
where S1: Stream,
S2: Stream<Item = S1::Item, Error = S1::Error>
{
Select {
stream1: stream1.fuse(),
stream2: stream2.fuse(),
flag: false,
}
}
impl<S1, S2> Stream for Select<S1, S2>
where S1: Stream,
S2: Stream<Item = S1::Item, Error = S1::Error>
{
type Item = S1::Item;
type Error = S1::Error;
fn poll(&mut self) -> Poll<Option<S1::Item>, S1::Error> {
let (a, b) = if self.flag {
(&mut self.stream2 as &mut Stream<Item=_, Error=_>,
&mut self.stream1 as &mut Stream<Item=_, Error=_>)
} else {
(&mut self.stream1 as &mut Stream<Item=_, Error=_>,
&mut self.stream2 as &mut Stream<Item=_, Error=_>)
};
self.flag = !self.flag;
let a_done = match try!(a.poll()) {
Async::Ready(Some(item)) => return Ok(Some(item).into()),
Async::Ready(None) => true,
Async::NotReady => false,
};
match try!(b.poll()) {
Async::Ready(Some(item)) => {
if !a_done {
self.flag = !self.flag;
}
return Ok(Some(item).into())
}
Async::Ready(None) if a_done => Ok(None.into()),
Async::Ready(None) => Ok(Async::NotReady),
Async::NotReady => Ok(Async::NotReady),
}
}
}