博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RxJava 和 RxAndroid 五(线程调度)
阅读量:4346 次
发布时间:2019-06-07

本文共 8169 字,大约阅读时间需要 27 分钟。

对rxJava不了解的同学可以先看

 

本文将有几个例子说明,rxjava线程调度的正确使用姿势。

例1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Observable
               
.create(
new
Observable.OnSubscribe<String>() {
                   
@Override
                   
public
void
call(Subscriber<?
super
String> subscriber) {
                       
Logger.v(
"rx_call"
, Thread.currentThread().getName()  );
 
                       
subscriber.onNext(
"dd"
);
                       
subscriber.onCompleted();
                   
}
               
})
               
.map(
new
Func1<String, String >() {
                   
@Override
                   
public
String call(String s) {
                       
Logger.v(
"rx_map"
, Thread.currentThread().getName()  );
                       
return
s +
"88"
;
                   
}
               
})
               
.subscribe(
new
Action1<String>() {
                   
@Override
                   
public
void
call(String s) {
                       
Logger.v(
"rx_subscribe"
, Thread.currentThread().getName()  );
                   
}
               
}) ;

  结果

/rx_call: main           -- 主线程

/rx_map: main        --  主线程
/rx_subscribe: main   -- 主线程

例2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
  
new
Thread(
new
Runnable() {
           
@Override
           
public
void
run() {
               
Logger.v(
"rx_newThread"
, Thread.currentThread().getName()  );
               
rx();
           
}
       
}).start();
 
void
rx(){
       
Observable
               
.create(
new
Observable.OnSubscribe<String>() {
                   
@Override
                   
public
void
call(Subscriber<?
super
String> subscriber) {
                       
Logger.v(
"rx_call"
, Thread.currentThread().getName()  );
 
                       
subscriber.onNext(
"dd"
);
                       
subscriber.onCompleted();
                   
}
               
})
               
.map(
new
Func1<String, String >() {
                   
@Override
                   
public
String call(String s) {
                       
Logger.v(
"rx_map"
, Thread.currentThread().getName()  );
                       
return
s +
"88"
;
                   
}
               
})
               
.subscribe(
new
Action1<String>() {
                   
@Override
                   
public
void
call(String s) {
                       
Logger.v(
"rx_subscribe"
, Thread.currentThread().getName()  );
                   
}
               
}) ;
 
   
}

 

      结果

/rx_newThread: Thread-564   -- 子线程

/rx_call: Thread-564              -- 子线程
/rx_map: Thread-564            -- 子线程 
/rx_subscribe: Thread-564    -- 子线程

 

  • 通过例1和例2,说明,Rxjava默认运行在当前线程中。如果当前线程是子线程,则rxjava运行在子线程;同样,当前线程是主线程,则rxjava运行在主线程

 

例3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Observable
               
.create(
new
Observable.OnSubscribe<String>() {
                   
@Override
                   
public
void
call(Subscriber<?
super
String> subscriber) {
                       
Logger.v(
"rx_call"
, Thread.currentThread().getName()  );
 
                       
subscriber.onNext(
"dd"
);
                       
subscriber.onCompleted();
                   
}
               
})
 
               
.subscribeOn(Schedulers.io())
               
.observeOn(AndroidSchedulers.mainThread())
 
               
.map(
new
Func1<String, String >() {
                   
@Override
                   
public
String call(String s) {
                       
Logger.v(
"rx_map"
, Thread.currentThread().getName()  );
                       
return
s +
"88"
;
                   
}
               
})
               
.subscribe(
new
Action1<String>() {
                   
@Override
                   
public
void
call(String s) {
                       
Logger.v(
"rx_subscribe"
, Thread.currentThread().getName()  );
                   
}
               
}) ;

  结果

/rx_call: RxCachedThreadScheduler-1    --io线程

/rx_map: main                                     --主线程
/rx_subscribe: main                              --主线程

 

例4

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Observable
               
.create(
new
Observable.OnSubscribe<String>() {
                   
@Override
                   
public
void
call(Subscriber<?
super
String> subscriber) {
                       
Logger.v(
"rx_call"
, Thread.currentThread().getName()  );
 
                       
subscriber.onNext(
"dd"
);
                       
subscriber.onCompleted();
                   
}
               
})
               
.map(
new
Func1<String, String >() {
                   
@Override
                   
public
String call(String s) {
                       
Logger.v(
"rx_map"
, Thread.currentThread().getName()  );
                       
return
s +
"88"
;
                   
}
               
})
 
               
.subscribeOn(Schedulers.io())
               
.observeOn(AndroidSchedulers.mainThread())
 
               
.subscribe(
new
Action1<String>() {
                   
@Override
                   
public
void
call(String s) {
                       
Logger.v(
"rx_subscribe"
, Thread.currentThread().getName()  );
                   
}
               
}) ; 

      结果

/rx_call: RxCachedThreadScheduler-1     --io线程

/rx_map: RxCachedThreadScheduler-1   --io线程
/rx_subscribe: main                              --主线程

   

  • 通过例3、例4 可以看出  .subscribeOn(Schedulers.io())  和 .observeOn(AndroidSchedulers.mainThread()) 写的位置不一样,造成的结果也不一样。从例4中可以看出 map() 操作符默认运行在事件产生的线程之中。事件消费只是在 subscribe() 里面。
  • 对于 create() , just() , from()   等                 --- 事件产生   

               map() , flapMap() , scan() , filter()  等    --  事件加工

              subscribe()                                          --  事件消费

  •   事件产生:默认运行在当前线程,可以由 subscribeOn()  自定义线程

         事件加工:默认跟事件产生的线程保持一致, 可以由 observeOn() 自定义线程

       事件消费:默认运行在当前线程,可以有observeOn() 自定义

 

例5  多次切换线程

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
Observable
                
.create(
new
Observable.OnSubscribe<String>() {
                    
@Override
                    
public
void
call(Subscriber<?
super
String> subscriber) {
                        
Logger.v(
"rx_call"
, Thread.currentThread().getName()  );
 
                        
subscriber.onNext(
"dd"
);
                        
subscriber.onCompleted();
                    
}
                
})
 
                
.observeOn( Schedulers.newThread() )   
//新线程
 
                
.map(
new
Func1<String, String >() {
                    
@Override
                    
public
String call(String s) {
                        
Logger.v(
"rx_map"
, Thread.currentThread().getName()  );
                        
return
s +
"88"
;
                    
}
                
})
 
                
.observeOn( Schedulers.io() )     
//io线程
 
                
.filter(
new
Func1<String, Boolean>() {
                    
@Override
                    
public
Boolean call(String s) {
                        
Logger.v(
"rx_filter"
, Thread.currentThread().getName()  );
                        
return
s !=
null
;
                    
}
                
})
 
                
.subscribeOn(Schedulers.io())    
//定义事件产生线程:io线程
                
.observeOn(AndroidSchedulers.mainThread())    
//事件消费线程:主线程
 
                
.subscribe(
new
Action1<String>() {
                    
@Override
                    
public
void
call(String s) {
                        
Logger.v(
"rx_subscribe"
, Thread.currentThread().getName()  );
                    
}
                
}) ;

  结果

/rx_call: RxCachedThreadScheduler-1           -- io 线程

/rx_map: RxNewThreadScheduler-1             -- new出来的线程
/rx_filter: RxCachedThreadScheduler-2        -- io线程
/rx_subscribe: main                                   -- 主线程

 

例6:只规定了事件产生的线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Observable
         
.create(
new
Observable.OnSubscribe<String>() {
             
@Override
             
public
void
call(Subscriber<?
super
String> subscriber) {
                 
Log.v(
"rx--create "
, Thread.currentThread().getName() ) ;
                 
subscriber.onNext(
"dd"
) ;
             
}
         
})
         
.subscribeOn(Schedulers.io())
         
.subscribe(
new
Action1<String>() {
             
@Override
             
public
void
call(String s) {
                 
Log.v(
"rx--subscribe "
, Thread.currentThread().getName() ) ;
             
}
         
}) ;

  结果

/rx--create: RxCachedThreadScheduler-4                      // io 线程

/rx--subscribe: RxCachedThreadScheduler-4                 // io 线程

     

例:7:只规定事件消费线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Observable
               
.create(
new
Observable.OnSubscribe<String>() {
                   
@Override
                   
public
void
call(Subscriber<?
super
String> subscriber) {
                       
Log.v(
"rx--create "
, Thread.currentThread().getName() ) ;
                       
subscriber.onNext(
"dd"
) ;
                   
}
               
})
               
.observeOn( Schedulers.newThread() )
               
.subscribe(
new
Action1<String>() {
                   
@Override
                   
public
void
call(String s) {
                       
Log.v(
"rx--subscribe "
, Thread.currentThread().getName() ) ;
                   
}
               
}) ;

  结果

/rx--create: main                                           -- 主线程

/rx--subscribe: RxNewThreadScheduler-1        --  new 出来的子线程 

      

    从例6可以看出,如果只规定了事件产生的线程,那么事件消费线程将跟随事件产生线程。

    从例7可以看出,如果只规定了事件消费的线程,那么事件产生的线程和 当前线程保持一致。

 

例8:线程调度封装

 在Android 常常有这样的场景,后台处理处理数据,前台展示数据。

一般的用法:

1
2
3
4
5
6
7
8
9
Observable
             
.just(
"123"
)
             
.subscribeOn( Schedulers.io())
             
.observeOn( AndroidSchedulers.mainThread() )
             
.subscribe(
new
Action1() {
                 
@Override
                 
public
void
call(Object o) {
                 
}
             
}) ;

  但是项目中这种场景有很多,所以我们就想能不能把这种场景的调度方式封装起来,方便调用。

简单的封装

1
2
3
4
public
Observable apply( Observable observable ){
   
return
observable.subscribeOn( Schedulers.io() )
            
.observeOn( AndroidSchedulers.mainThread() ) ;
}

使用

1
2
3
4
5
6
7
apply( Observable.just(
"123"
) )
              
.subscribe(
new
Action1() {
                  
@Override
                  
public
void
call(Object o) {
 
                  
}
              
}) ;

弊端:虽然上面的这种封装可以做到线程调度的目的,但是它破坏了链式编程的结构,是编程风格变得不优雅。

改进:Transformers 的使用(就是转化器的意思,把一种类型的Observable转换成另一种类型的Observable )

改进后的封装

1
2
3
4
5
6
Observable.Transformer schedulersTransformer =
new 
Observable.Transformer() {
    
@Override
public
Object call(Object observable) {
        
return
((Observable)  observable).subscribeOn(Schedulers.newThread())
                
.observeOn(AndroidSchedulers.mainThread());
    
}
};

  使用

1
2
3
4
5
6
7
8
Observable
          
.just(
"123"
)
          
.compose( schedulersTransformer )
          
.subscribe(
new
Action1() {
              
@Override
              
public
void
call(Object o) {
              
}
          
}) ;

  弊端:虽然保持了链式编程结构的完整,但是每次调用 .compose( schedulersTransformer ) 都是 new 了一个对象的。所以我们需要再次封装,尽量保证单例的模式。

改进后的封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package
lib.app.com.myapplication;
 
import
rx.Observable;
import
rx.android.schedulers.AndroidSchedulers;
import
rx.schedulers.Schedulers;
 
/**
 
* Created by ${zyj} on 2016/7/1.
 
*/
public
class
RxUtil {
 
    
private
final
static
Observable.Transformer schedulersTransformer =
new 
Observable.Transformer() {
        
@Override
public
Object call(Object observable) {
            
return
((Observable)  observable).subscribeOn(Schedulers.newThread())
                    
.observeOn(AndroidSchedulers.mainThread());
        
}
    
};
 
   
public
static 
<T> Observable.Transformer<T, T> applySchedulers() {
        
return
(Observable.Transformer<T, T>) schedulersTransformer;
    
}
 
}

  使用

1
2
3
4
5
6
7
8
Observable
            
.just(
"123"
)
            
.compose( RxUtil.<String>applySchedulers() )
            
.subscribe(
new
Action1() {
                
@Override
                
public
void
call(Object o) {
                
}
            
}) ;

转载于:https://www.cnblogs.com/Free-Thinker/p/7561487.html

你可能感兴趣的文章
Codeforces 678E Another Sith Tournament 状压DP
查看>>
201771010112罗松《面向对象程序设计(java)》第七周学习总结
查看>>
mysql数据库的锁表与解决办法(原博客url:http://www.cnblogs.com/wanghuaijun/p/5949934.html)...
查看>>
Git
查看>>
【CF860E】Arkady and a Nobody-men 长链剖分
查看>>
python爬虫模拟登陆
查看>>
Redis(六)-- SpringMVC整合Redis
查看>>
bzoj1660:乱发节
查看>>
即时通信系统Openfire分析之四:消息路由
查看>>
SQL 笔记
查看>>
浅析Staging
查看>>
Unity倒计时动画
查看>>
rem布局
查看>>
Windows server 2008 R2配置多个远程连接的教程
查看>>
PHP 重置数组为连续数字索引的几种方式
查看>>
南阳理工acm 88-汉诺塔(一)
查看>>
160809308周子济第六次作业
查看>>
大型Web应用运行时 PHP负载均衡指南
查看>>
为phpStorm 配置PHP_CodeSniffer自动检查代码
查看>>
软件cs页面分辨率测试
查看>>