Kotlin 的锁和多线程同步

2024-06-14 1414阅读

Synchronized.kt 的源码:

/**
 * Executes the given function [block] while holding the monitor of the given object [lock].
 */
@kotlin.internal.InlineOnly
public inline fun  synchronized(lock: Any, block: () -> R): R {
    contract {
        callsInPlace(block, InvocationKind.EXACTLY_ONCE)
    }
    @Suppress("NON_PUBLIC_CALL_FROM_PUBLIC_INLINE", "INVISIBLE_MEMBER")
    monitorEnter(lock)
    try {
        return block()
    }
    finally {
        @Suppress("NON_PUBLIC_CALL_FROM_PUBLIC_INLINE", "INVISIBLE_MEMBER")
        monitorExit(lock)
    }
}

JvmFlagAnnotations.kt 的源码:

/**
 * Marks the JVM backing field of the annotated property as `volatile`, meaning that writes to this field
 * are immediately made visible to other threads.
 */
@Target(FIELD)
@Retention(AnnotationRetention.SOURCE)
@MustBeDocumented
public actual annotation class Volatile
/**
 * Marks the JVM backing field of the annotated property as `transient`, meaning that it is not
 * part of the default serialized form of the object.
 */
@Target(FIELD)
@Retention(AnnotationRetention.SOURCE)
@MustBeDocumented
public actual annotation class Transient
/**
 * Marks the JVM method generated from the annotated function as `strictfp`, meaning that the precision
 * of floating point operations performed inside the method needs to be restricted in order to
 * achieve better portability.
 */
@Target(FUNCTION, CONSTRUCTOR, PROPERTY_GETTER, PROPERTY_SETTER, CLASS)
@Retention(AnnotationRetention.SOURCE)
@MustBeDocumented
public actual annotation class Strictfp
/**
 * Marks the JVM method generated from the annotated function as `synchronized`, meaning that the method
 * will be protected from concurrent execution by multiple threads by the monitor of the instance (or,
 * for static methods, the class) on which the method is defined.
 */
@Target(FUNCTION, PROPERTY_GETTER, PROPERTY_SETTER)
@Retention(AnnotationRetention.SOURCE)
@MustBeDocumented
public actual annotation class Synchronized

如何使用 Synchronized 同步锁:

Kotlin 的锁和多线程同步
(图片来源网络,侵删)

在Java中,给一个方法加锁 ,需要给方法加 synchronized 关键字

public synchronized void doSomething() {
}

kotlin 中没有 synchronized 关键之,取而代之的是 @Synchronized 注解

class MyUtil {
    @Synchronized
    fun doSomething() {
    }
}

Kotlin 在方法内,可以使用 block 块

class Util {
    val lock = Any()
    fun main() {
        synchronized(lock) {
        }
    }
}

Volatile 关键字

有时仅仅为了读写一个或者两个实例域就使用同步的话,显得开销过大;而Volatile关键字为实例域的同步访问提供了免锁的机制。

当一个共享变量被vlatile修饰时,其就具备两个含义:

1、一个线程修改了变量的值,变量的新值对其他线程是立即可见的。

2、禁止使用指令重排序。

什么是指令重排序呢?

重排序通常是编译器或者运行环境为了优化程序性能而采取的对指令重新排序执行的一种手段。重排序分为两类:编译期重排序和运行期重排序,分别对应着编译时和运行时环境。

在 kotlin 中没有 volatile 关键字,但是有 @Volatile 注解

class Util {
    @Volatile
    var lock = Any()
}

在 kotlin 中的 Any 和 java 中的 Object 相似,每一个类都是从 Any 继承过来的,但是 Any 并没有声明 wait() , notify() 和 notifyAll() 方法,这就意味着,你不能在kotlin类中调用这些方法。但是你仍然能够使用java.lang.Object的实例作为lock,并且调用相关的方法。下面将会展示一个使用 Object 做为 lock 解决生产者和消费者的问题。

    private val lock = Object()
    fun produce() = synchronized(lock) {
          while(items>=maxItems) {
            lock.wait()
          }
          Thread.sleep(Random.nextInt(100).toLong())
          items++
          println("Produced, count is$items:${Thread.currentThread()}")
          lock.notifyAll()
    }
    fun consume() = synchronized(lock) {
        while(items String = {
    sleep(2000)
    "Hello".also { println("task1 finished: $it") }
}
val task2: () -> String = {
    sleep(2000)
    "World".also { println("task2 finished: $it") }
}
val task3: (String, String) -> String = { p1, p2 ->
    sleep(2000)
    "$p1 $p2".also { println("task3 finished: $it") }
}

1. Thread.join()

Kotlin 兼容 Java,Java 的所有线程工具默认都可以使用。其中最简单的线程同步方式就是使用 Thread 的 join() :

@Test
fun test_join() {
    lateinit var s1: String
    lateinit var s2: String
    val t1 = Thread { s1 = task1() }
    val t2 = Thread { s2 = task2() }
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    
    task3(s1, s2)
}

2. Synchronized

使用 synchronized 锁进行同步

    @Test
    fun test_synchrnoized() {
        lateinit var s1: String
        Thread {
            synchronized(Unit) {
                s1 = task1()
            }
        }.start()
        val s2: String = task2()
        synchronized(Unit) {
            task3(s1, s2)
        }
    }

但是如果超过三个任务,使用 synchrnoized 这种写法就比较别扭了,为了同步多个并行任务的结果需要声明n个锁,并嵌套n个 synchronized。

3. ReentrantLock

ReentrantLock 是 JUC 提供的线程锁,可以替换 synchronized 的使用

    @Test
    fun test_ReentrantLock() {
        lateinit var s1: String
        val lock = ReentrantLock()
        Thread {
            lock.lock()
            s1 = task1()
            lock.unlock()
        }.start()
        val s2: String = task2()
        lock.lock()
        task3(s1, s2)
        lock.unlock()
    }

ReentrantLock 的好处是,当有多个并行任务时是不会出现嵌套 synchrnoized 的问题,但仍然需要创建多个 lock 管理不同的任务。

4. BlockingQueue

阻塞队列内部也是通过 Lock 实现的,所以也可以达到同步锁的效果

    @Test
    fun test_blockingQueue() {
        lateinit var s1: String
        val queue = SynchronousQueue()
        Thread {
            s1 = task1()
            queue.put(Unit)
        }.start()
        val s2: String = task2()
        queue.take()
        task3(s1, s2)
    }

当然,阻塞队列更多是使用在生产/消费场景中的同步。

5. CountDownLatch

UC 中的锁大都基于 AQS 实现的,可以分为独享锁和共享锁。ReentrantLock 就是一种独享锁。相比之下,共享锁更适合本场景。 例如 CountDownLatch,它可以让一个线程一直处于阻塞状态,直到其他线程的执行全部完成:

    @Test
    fun test_countdownlatch() {
        lateinit var s1: String
        lateinit var s2: String
        val cd = CountDownLatch(2)
        Thread() {
            s1 = task1()
            cd.countDown()
        }.start()
        Thread() {
            s2 = task2()
            cd.countDown()
        }.start()
        cd.await()
        task3(s1, s2)
    }

共享锁的好处是不必为了每个任务都创建单独的锁,即使再多并行任务写起来也很轻松。

6. CyclicBarrier

CyclicBarrier 是 JUC 提供的另一种共享锁机制,它可以让一组线程到达一个同步点后再一起继续运行,其中任意一个线程未达到同步点,其他已到达的线程均会被阻塞。

与 CountDownLatch 的区别在于 CountDownLatch 是一次性的,而 CyclicBarrier 可以被重置后重复使用,这也正是 Cyclic 的命名由来,可以循环使用。

    @Test
    fun test_CyclicBarrier() {
        lateinit var s1: String
        lateinit var s2: String
        val cb = CyclicBarrier(3)
        Thread {
            s1 = task1()
            cb.await()
        }.start()
        Thread() {
            s2 = task1()
            cb.await()
        }.start()
        cb.await()
        task3(s1, s2)
    }

7. CAS

AQS 内部通过自旋锁实现同步,自旋锁的本质是利用 CompareAndSwap 避免线程阻塞的开销。 因此,我们可以使用基于 CAS 的原子类计数,达到实现无锁操作的目的。

    @Test
    fun test_cas() {
        lateinit var s1: String
        lateinit var s2: String
        val cas = AtomicInteger(2)
        Thread {
            s1 = task1()
            cas.getAndDecrement()
        }.start()
        Thread {
            s2 = task2()
            cas.getAndDecrement()
        }.start()
        while (cas.get() != 0) {}
        task3(s1, s2)
    }

While 循环空转看起来有些浪费资源,但是自旋锁的本质就是这样,所以 CAS 仅仅适用于一些cpu密集型的短任务同步。

8. Future

上面无论有锁操作还是无锁操作,都需要定义两个变量s1、s2记录结果非常不方便。 Java 1.5 开始,提供了 Callable 和 Future ,可以在任务执行结束时返回结果。

    @Test
    fun test_future() {
        val future1 = FutureTask(Callable(task1))
        val future2 = FutureTask(Callable(task2))
        Executors.newCachedThreadPool().execute(future1)
        Executors.newCachedThreadPool().execute(future2)
        task3(future1.get(), future2.get())
    }

通过 future.get(),可以同步等待结果返回,写起来非常方便。

9. CompletableFuture

future.get() 虽然方便,但是会阻塞线程。 Java 8 中引入了 CompletableFuture ,他实现了 Future 接口的同时实现了 CompletionStage 接口。 CompletableFuture 可以针对多个 CompletionStage 进行逻辑组合、实现复杂的异步编程。 这些逻辑组合的方法以回调的形式避免了线程阻塞:

    @Test
    fun test_CompletableFuture() {
        CompletableFuture.supplyAsync(task1)
            .thenCombine(CompletableFuture.supplyAsync(task2)) { p1, p2 ->
                task3(p1, p2)
            }.join()
    }

10. RxJava

RxJava 提供的各种操作符以及线程切换能力同样可以帮助我们实现需求: zip 操作符可以组合两个 Observable 的结果;subscribeOn 用来启动异步任务

    @Test
    fun test_Rxjava() {
        Observable.zip(
            Observable.fromCallable(Callable(task1))
                .subscribeOn(Schedulers.newThread()),
            Observable.fromCallable(Callable(task2))
                .subscribeOn(Schedulers.newThread()),
            BiFunction(task3)
        ).test().await()
    }

11. Coroutine

前面那么多方式,其实都是 Java 的工具。 Coroutine 终于算得上是 Kotlin 特有的工具了:

    @Test
    fun test_coroutine() {
        runBlocking {
            val c1 = async(Dispatchers.IO) {
                task1()
            }
            val c2 = async(Dispatchers.IO) {
                task2()
            }
            task3(c1.await(), c2.await())
        }
    }

12. Flow

Flow 就是 Coroutine 版的 RxJava,具备很多 RxJava 的操作符,例如 zip:

    @Test
    fun test_flow() {
        val flow1 = flow { emit(task1()) }
        val flow2 = flow { emit(task2()) }
        runBlocking {
            flow1.zip(flow2) { t1, t2 ->
                task3(t1, t2)
            }.flowOn(Dispatchers.IO).collect()
        }
    }

FlowOn 使得 Task 在异步计算并发射结果。

总结

作为结论,在 Kotlin 上最好用的线程同步方案首推协程。

VPS购买请点击我

免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们,邮箱:ciyunidc@ciyunshuju.com。本站只作为美观性配图使用,无任何非法侵犯第三方意图,一切解释权归图片著作权方,本站不承担任何责任。如有恶意碰瓷者,必当奉陪到底严惩不贷!

目录[+]