Kotlin 的锁和多线程同步
Synchronized.kt 的源码:
/**
* Executes the given function [block] while holding the monitor of the given object [lock].
*/
@kotlin.internal.InlineOnly
public inline fun synchronized(lock: Any, block: () -> R): R {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
@Suppress("NON_PUBLIC_CALL_FROM_PUBLIC_INLINE", "INVISIBLE_MEMBER")
monitorEnter(lock)
try {
return block()
}
finally {
@Suppress("NON_PUBLIC_CALL_FROM_PUBLIC_INLINE", "INVISIBLE_MEMBER")
monitorExit(lock)
}
}
JvmFlagAnnotations.kt 的源码:
/** * Marks the JVM backing field of the annotated property as `volatile`, meaning that writes to this field * are immediately made visible to other threads. */ @Target(FIELD) @Retention(AnnotationRetention.SOURCE) @MustBeDocumented public actual annotation class Volatile /** * Marks the JVM backing field of the annotated property as `transient`, meaning that it is not * part of the default serialized form of the object. */ @Target(FIELD) @Retention(AnnotationRetention.SOURCE) @MustBeDocumented public actual annotation class Transient /** * Marks the JVM method generated from the annotated function as `strictfp`, meaning that the precision * of floating point operations performed inside the method needs to be restricted in order to * achieve better portability. */ @Target(FUNCTION, CONSTRUCTOR, PROPERTY_GETTER, PROPERTY_SETTER, CLASS) @Retention(AnnotationRetention.SOURCE) @MustBeDocumented public actual annotation class Strictfp /** * Marks the JVM method generated from the annotated function as `synchronized`, meaning that the method * will be protected from concurrent execution by multiple threads by the monitor of the instance (or, * for static methods, the class) on which the method is defined. */ @Target(FUNCTION, PROPERTY_GETTER, PROPERTY_SETTER) @Retention(AnnotationRetention.SOURCE) @MustBeDocumented public actual annotation class Synchronized
如何使用 Synchronized 同步锁:
在Java中,给一个方法加锁 ,需要给方法加 synchronized 关键字
public synchronized void doSomething() {
}
kotlin 中没有 synchronized 关键之,取而代之的是 @Synchronized 注解
class MyUtil {
@Synchronized
fun doSomething() {
}
}
Kotlin 在方法内,可以使用 block 块
class Util {
val lock = Any()
fun main() {
synchronized(lock) {
}
}
}
Volatile 关键字
有时仅仅为了读写一个或者两个实例域就使用同步的话,显得开销过大;而Volatile关键字为实例域的同步访问提供了免锁的机制。
当一个共享变量被vlatile修饰时,其就具备两个含义:
1、一个线程修改了变量的值,变量的新值对其他线程是立即可见的。
2、禁止使用指令重排序。
什么是指令重排序呢?
重排序通常是编译器或者运行环境为了优化程序性能而采取的对指令重新排序执行的一种手段。重排序分为两类:编译期重排序和运行期重排序,分别对应着编译时和运行时环境。
在 kotlin 中没有 volatile 关键字,但是有 @Volatile 注解
class Util {
@Volatile
var lock = Any()
}
在 kotlin 中的 Any 和 java 中的 Object 相似,每一个类都是从 Any 继承过来的,但是 Any 并没有声明 wait() , notify() 和 notifyAll() 方法,这就意味着,你不能在kotlin类中调用这些方法。但是你仍然能够使用java.lang.Object的实例作为lock,并且调用相关的方法。下面将会展示一个使用 Object 做为 lock 解决生产者和消费者的问题。
private val lock = Object()
fun produce() = synchronized(lock) {
while(items>=maxItems) {
lock.wait()
}
Thread.sleep(Random.nextInt(100).toLong())
items++
println("Produced, count is$items:${Thread.currentThread()}")
lock.notifyAll()
}
fun consume() = synchronized(lock) {
while(items String = {
sleep(2000)
"Hello".also { println("task1 finished: $it") }
}
val task2: () -> String = {
sleep(2000)
"World".also { println("task2 finished: $it") }
}
val task3: (String, String) -> String = { p1, p2 ->
sleep(2000)
"$p1 $p2".also { println("task3 finished: $it") }
}
1. Thread.join()
Kotlin 兼容 Java,Java 的所有线程工具默认都可以使用。其中最简单的线程同步方式就是使用 Thread 的 join() :
@Test
fun test_join() {
lateinit var s1: String
lateinit var s2: String
val t1 = Thread { s1 = task1() }
val t2 = Thread { s2 = task2() }
t1.start()
t2.start()
t1.join()
t2.join()
task3(s1, s2)
}
2. Synchronized
使用 synchronized 锁进行同步
@Test
fun test_synchrnoized() {
lateinit var s1: String
Thread {
synchronized(Unit) {
s1 = task1()
}
}.start()
val s2: String = task2()
synchronized(Unit) {
task3(s1, s2)
}
}
但是如果超过三个任务,使用 synchrnoized 这种写法就比较别扭了,为了同步多个并行任务的结果需要声明n个锁,并嵌套n个 synchronized。
3. ReentrantLock
ReentrantLock 是 JUC 提供的线程锁,可以替换 synchronized 的使用
@Test
fun test_ReentrantLock() {
lateinit var s1: String
val lock = ReentrantLock()
Thread {
lock.lock()
s1 = task1()
lock.unlock()
}.start()
val s2: String = task2()
lock.lock()
task3(s1, s2)
lock.unlock()
}
ReentrantLock 的好处是,当有多个并行任务时是不会出现嵌套 synchrnoized 的问题,但仍然需要创建多个 lock 管理不同的任务。
4. BlockingQueue
阻塞队列内部也是通过 Lock 实现的,所以也可以达到同步锁的效果
@Test
fun test_blockingQueue() {
lateinit var s1: String
val queue = SynchronousQueue()
Thread {
s1 = task1()
queue.put(Unit)
}.start()
val s2: String = task2()
queue.take()
task3(s1, s2)
}
当然,阻塞队列更多是使用在生产/消费场景中的同步。
5. CountDownLatch
UC 中的锁大都基于 AQS 实现的,可以分为独享锁和共享锁。ReentrantLock 就是一种独享锁。相比之下,共享锁更适合本场景。 例如 CountDownLatch,它可以让一个线程一直处于阻塞状态,直到其他线程的执行全部完成:
@Test
fun test_countdownlatch() {
lateinit var s1: String
lateinit var s2: String
val cd = CountDownLatch(2)
Thread() {
s1 = task1()
cd.countDown()
}.start()
Thread() {
s2 = task2()
cd.countDown()
}.start()
cd.await()
task3(s1, s2)
}
共享锁的好处是不必为了每个任务都创建单独的锁,即使再多并行任务写起来也很轻松。
6. CyclicBarrier
CyclicBarrier 是 JUC 提供的另一种共享锁机制,它可以让一组线程到达一个同步点后再一起继续运行,其中任意一个线程未达到同步点,其他已到达的线程均会被阻塞。
与 CountDownLatch 的区别在于 CountDownLatch 是一次性的,而 CyclicBarrier 可以被重置后重复使用,这也正是 Cyclic 的命名由来,可以循环使用。
@Test
fun test_CyclicBarrier() {
lateinit var s1: String
lateinit var s2: String
val cb = CyclicBarrier(3)
Thread {
s1 = task1()
cb.await()
}.start()
Thread() {
s2 = task1()
cb.await()
}.start()
cb.await()
task3(s1, s2)
}
7. CAS
AQS 内部通过自旋锁实现同步,自旋锁的本质是利用 CompareAndSwap 避免线程阻塞的开销。 因此,我们可以使用基于 CAS 的原子类计数,达到实现无锁操作的目的。
@Test
fun test_cas() {
lateinit var s1: String
lateinit var s2: String
val cas = AtomicInteger(2)
Thread {
s1 = task1()
cas.getAndDecrement()
}.start()
Thread {
s2 = task2()
cas.getAndDecrement()
}.start()
while (cas.get() != 0) {}
task3(s1, s2)
}
While 循环空转看起来有些浪费资源,但是自旋锁的本质就是这样,所以 CAS 仅仅适用于一些cpu密集型的短任务同步。
8. Future
上面无论有锁操作还是无锁操作,都需要定义两个变量s1、s2记录结果非常不方便。 Java 1.5 开始,提供了 Callable 和 Future ,可以在任务执行结束时返回结果。
@Test
fun test_future() {
val future1 = FutureTask(Callable(task1))
val future2 = FutureTask(Callable(task2))
Executors.newCachedThreadPool().execute(future1)
Executors.newCachedThreadPool().execute(future2)
task3(future1.get(), future2.get())
}
通过 future.get(),可以同步等待结果返回,写起来非常方便。
9. CompletableFuture
future.get() 虽然方便,但是会阻塞线程。 Java 8 中引入了 CompletableFuture ,他实现了 Future 接口的同时实现了 CompletionStage 接口。 CompletableFuture 可以针对多个 CompletionStage 进行逻辑组合、实现复杂的异步编程。 这些逻辑组合的方法以回调的形式避免了线程阻塞:
@Test
fun test_CompletableFuture() {
CompletableFuture.supplyAsync(task1)
.thenCombine(CompletableFuture.supplyAsync(task2)) { p1, p2 ->
task3(p1, p2)
}.join()
}
10. RxJava
RxJava 提供的各种操作符以及线程切换能力同样可以帮助我们实现需求: zip 操作符可以组合两个 Observable 的结果;subscribeOn 用来启动异步任务
@Test
fun test_Rxjava() {
Observable.zip(
Observable.fromCallable(Callable(task1))
.subscribeOn(Schedulers.newThread()),
Observable.fromCallable(Callable(task2))
.subscribeOn(Schedulers.newThread()),
BiFunction(task3)
).test().await()
}
11. Coroutine
前面那么多方式,其实都是 Java 的工具。 Coroutine 终于算得上是 Kotlin 特有的工具了:
@Test
fun test_coroutine() {
runBlocking {
val c1 = async(Dispatchers.IO) {
task1()
}
val c2 = async(Dispatchers.IO) {
task2()
}
task3(c1.await(), c2.await())
}
}
12. Flow
Flow 就是 Coroutine 版的 RxJava,具备很多 RxJava 的操作符,例如 zip:
@Test
fun test_flow() {
val flow1 = flow { emit(task1()) }
val flow2 = flow { emit(task2()) }
runBlocking {
flow1.zip(flow2) { t1, t2 ->
task3(t1, t2)
}.flowOn(Dispatchers.IO).collect()
}
}
FlowOn 使得 Task 在异步计算并发射结果。
总结
作为结论,在 Kotlin 上最好用的线程同步方案首推协程。
