diff options
Diffstat (limited to 'kotlinx-coroutines-core/common/src/sync/Semaphore.kt')
-rw-r--r-- | kotlinx-coroutines-core/common/src/sync/Semaphore.kt | 41 |
1 files changed, 20 insertions, 21 deletions
diff --git a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt index 6e0552d1..6ab377da 100644 --- a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt +++ b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt @@ -1,19 +1,19 @@ package kotlinx.coroutines.sync -import kotlinx.atomicfu.atomic -import kotlinx.atomicfu.atomicArrayOfNulls -import kotlinx.atomicfu.getAndUpdate -import kotlinx.atomicfu.loop +import kotlinx.atomicfu.* import kotlinx.coroutines.* import kotlinx.coroutines.internal.* -import kotlin.coroutines.resume -import kotlin.math.max +import kotlin.coroutines.* +import kotlin.jvm.* +import kotlin.math.* /** - * A counting semaphore for coroutines. It maintains a number of available permits. - * Each [acquire] suspends if necessary until a permit is available, and then takes it. + * A counting semaphore for coroutines that logically maintains a number of available permits. + * Each [acquire] takes a single permit or suspends until it is available. * Each [release] adds a permit, potentially releasing a suspended acquirer. + * Semaphore is fair and maintains a FIFO order of acquirers. * + * Semaphores are mostly used to limit the number of coroutines that have an access to particular resource. * Semaphore with `permits = 1` is essentially a [Mutex]. **/ public interface Semaphore { @@ -29,11 +29,12 @@ public interface Semaphore { * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this * function is suspended, this function immediately resumes with [CancellationException]. * - * *Cancellation of suspended semaphore acquisition` is atomic* -- when this function + * *Cancellation of suspended semaphore acquisition is atomic* -- when this function * throws [CancellationException] it means that the semaphore was not acquired. * - * Note, that this function does not check for cancellation when it is not suspended. - * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed. + * Note, that this function does not check for cancellation when it does not suspend. + * Use [CoroutineScope.isActive] or [CoroutineScope.ensureActive] to periodically + * check for cancellation in tight loops if needed. * * Use [tryAcquire] to try acquire a permit of this semaphore without suspension. */ @@ -49,8 +50,7 @@ public interface Semaphore { /** * Releases a permit, returning it into this semaphore. Resumes the first * suspending acquirer if there is one at the point of invocation. - * Throws [IllegalStateException] if there is no acquired permit - * at the point of invocation. + * Throws [IllegalStateException] if the number of [release] invocations is greater than the number of preceding [acquire]. */ public fun release() } @@ -83,8 +83,8 @@ private class SemaphoreImpl( private val permits: Int, acquiredPermits: Int ) : Semaphore, SegmentQueue<SemaphoreSegment>() { init { - require(permits > 0) { "Semaphore should have at least 1 permit" } - require(acquiredPermits in 0..permits) { "The number of acquired permits should be in 0..permits" } + require(permits > 0) { "Semaphore should have at least 1 permit, but had $permits" } + require(acquiredPermits in 0..permits) { "The number of acquired permits should be in 0..$permits" } } override fun newSegment(id: Long, prev: SemaphoreSegment?) = SemaphoreSegment(id, prev) @@ -96,7 +96,7 @@ private class SemaphoreImpl( * and the maximum number of waiting acquirers cannot be greater than 2^31 in any * real application. */ - private val _availablePermits = atomic(permits) + private val _availablePermits = atomic(permits - acquiredPermits) override val availablePermits: Int get() = max(_availablePermits.value, 0) // The queue of waiting acquirers is essentially an infinite array based on `SegmentQueue`; @@ -126,8 +126,8 @@ private class SemaphoreImpl( resumeNextFromQueue() } - internal fun incPermits() = _availablePermits.getAndUpdate { cur -> - check(cur < permits) { "The number of acquired permits cannot be greater than `permits`" } + fun incPermits() = _availablePermits.getAndUpdate { cur -> + check(cur < permits) { "The number of released permits cannot be greater than $permits" } cur + 1 } @@ -176,6 +176,8 @@ private class CancelSemaphoreAcquisitionHandler( private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?): Segment<SemaphoreSegment>(id, prev) { val acquirers = atomicArrayOfNulls<Any?>(SEGMENT_SIZE) + private val cancelledSlots = atomic(0) + override val removed get() = cancelledSlots.value == SEGMENT_SIZE @Suppress("NOTHING_TO_INLINE") inline fun get(index: Int): Any? = acquirers[index].value @@ -186,9 +188,6 @@ private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?): Segment<Semap @Suppress("NOTHING_TO_INLINE") inline fun getAndSet(index: Int, value: Any?) = acquirers[index].getAndSet(value) - private val cancelledSlots = atomic(0) - override val removed get() = cancelledSlots.value == SEGMENT_SIZE - // Cleans the acquirer slot located by the specified index // and removes this segment physically if all slots are cleaned. fun cancel(index: Int): Boolean { |