aboutsummaryrefslogtreecommitdiffstats
path: root/kotlinx-coroutines-core/common/src/sync/Semaphore.kt
diff options
context:
space:
mode:
Diffstat (limited to 'kotlinx-coroutines-core/common/src/sync/Semaphore.kt')
-rw-r--r--kotlinx-coroutines-core/common/src/sync/Semaphore.kt41
1 files changed, 20 insertions, 21 deletions
diff --git a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt
index 6e0552d1..6ab377da 100644
--- a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt
+++ b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt
@@ -1,19 +1,19 @@
package kotlinx.coroutines.sync
-import kotlinx.atomicfu.atomic
-import kotlinx.atomicfu.atomicArrayOfNulls
-import kotlinx.atomicfu.getAndUpdate
-import kotlinx.atomicfu.loop
+import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.internal.*
-import kotlin.coroutines.resume
-import kotlin.math.max
+import kotlin.coroutines.*
+import kotlin.jvm.*
+import kotlin.math.*
/**
- * A counting semaphore for coroutines. It maintains a number of available permits.
- * Each [acquire] suspends if necessary until a permit is available, and then takes it.
+ * A counting semaphore for coroutines that logically maintains a number of available permits.
+ * Each [acquire] takes a single permit or suspends until it is available.
* Each [release] adds a permit, potentially releasing a suspended acquirer.
+ * Semaphore is fair and maintains a FIFO order of acquirers.
*
+ * Semaphores are mostly used to limit the number of coroutines that have an access to particular resource.
* Semaphore with `permits = 1` is essentially a [Mutex].
**/
public interface Semaphore {
@@ -29,11 +29,12 @@ public interface Semaphore {
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
* function is suspended, this function immediately resumes with [CancellationException].
*
- * *Cancellation of suspended semaphore acquisition` is atomic* -- when this function
+ * *Cancellation of suspended semaphore acquisition is atomic* -- when this function
* throws [CancellationException] it means that the semaphore was not acquired.
*
- * Note, that this function does not check for cancellation when it is not suspended.
- * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
+ * Note, that this function does not check for cancellation when it does not suspend.
+ * Use [CoroutineScope.isActive] or [CoroutineScope.ensureActive] to periodically
+ * check for cancellation in tight loops if needed.
*
* Use [tryAcquire] to try acquire a permit of this semaphore without suspension.
*/
@@ -49,8 +50,7 @@ public interface Semaphore {
/**
* Releases a permit, returning it into this semaphore. Resumes the first
* suspending acquirer if there is one at the point of invocation.
- * Throws [IllegalStateException] if there is no acquired permit
- * at the point of invocation.
+ * Throws [IllegalStateException] if the number of [release] invocations is greater than the number of preceding [acquire].
*/
public fun release()
}
@@ -83,8 +83,8 @@ private class SemaphoreImpl(
private val permits: Int, acquiredPermits: Int
) : Semaphore, SegmentQueue<SemaphoreSegment>() {
init {
- require(permits > 0) { "Semaphore should have at least 1 permit" }
- require(acquiredPermits in 0..permits) { "The number of acquired permits should be in 0..permits" }
+ require(permits > 0) { "Semaphore should have at least 1 permit, but had $permits" }
+ require(acquiredPermits in 0..permits) { "The number of acquired permits should be in 0..$permits" }
}
override fun newSegment(id: Long, prev: SemaphoreSegment?) = SemaphoreSegment(id, prev)
@@ -96,7 +96,7 @@ private class SemaphoreImpl(
* and the maximum number of waiting acquirers cannot be greater than 2^31 in any
* real application.
*/
- private val _availablePermits = atomic(permits)
+ private val _availablePermits = atomic(permits - acquiredPermits)
override val availablePermits: Int get() = max(_availablePermits.value, 0)
// The queue of waiting acquirers is essentially an infinite array based on `SegmentQueue`;
@@ -126,8 +126,8 @@ private class SemaphoreImpl(
resumeNextFromQueue()
}
- internal fun incPermits() = _availablePermits.getAndUpdate { cur ->
- check(cur < permits) { "The number of acquired permits cannot be greater than `permits`" }
+ fun incPermits() = _availablePermits.getAndUpdate { cur ->
+ check(cur < permits) { "The number of released permits cannot be greater than $permits" }
cur + 1
}
@@ -176,6 +176,8 @@ private class CancelSemaphoreAcquisitionHandler(
private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?): Segment<SemaphoreSegment>(id, prev) {
val acquirers = atomicArrayOfNulls<Any?>(SEGMENT_SIZE)
+ private val cancelledSlots = atomic(0)
+ override val removed get() = cancelledSlots.value == SEGMENT_SIZE
@Suppress("NOTHING_TO_INLINE")
inline fun get(index: Int): Any? = acquirers[index].value
@@ -186,9 +188,6 @@ private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?): Segment<Semap
@Suppress("NOTHING_TO_INLINE")
inline fun getAndSet(index: Int, value: Any?) = acquirers[index].getAndSet(value)
- private val cancelledSlots = atomic(0)
- override val removed get() = cancelledSlots.value == SEGMENT_SIZE
-
// Cleans the acquirer slot located by the specified index
// and removes this segment physically if all slots are cleaned.
fun cancel(index: Int): Boolean {