How to implement pessimistic locking with Aerospike and Project Reactor ?
1. Introduction
Pessimistic locking is general pattern commonly used to maintain data consistency.
In this post I will describe how to implement pessimistic locking with No-SQL storage, that has reactive client.
2. Pessimistic lock interface
We will support following operations: tryAcquire
and release
public interface PessimisticLock {
Mono<Boolean> tryAcquire(String key);
Mono<Boolean> release(String key);
}
3. Implementation
To implement pessimistic locking we will use special bin with no value and short expiration (to prevent hanged locks). The idea is simple:
if there is a record in storage for given key, then lock is acquired by some other concurrent service
if there is no record, then lock is free and current service can acquire it
@Slf4j
@RequiredArgsConsturctor
public class DefaultPessimisticLock implements PessimisticLock {
private final IAerospikeReactorClient client;
private final RetryProperties retryProperties;
private final LockProperties lockProperties;
private final AerospikeProperties aerospikeProperties;
@Override
public Mono<Boolean> tryAcquire(String key) {
Key lockKey = toLockKey(key);
Bin lockBin = toLockBin(key);
return Mono.defer(() -> client.put(acquirePolicy(), lockKey, lockBin).map(Objects::nonNull))
.retryWhen(Retries.aerospikeError(retryProperties))
.onErrorMap(error -> {
log.warn("Failed to acquire lock " + key, error);
return new PessimisticLockAcquireException("Failed to acquire lock " + key, error);
});
}
@Override
public Mono<Boolean> release(String key) {
Key lockKey = toLockKey(key);
return client.delete(releasePolicy(), lockKey)
.map(Objects::nonNull)
.onErrorResume(error -> {
log.warn("Failed to release lock " + key, error);
return Mono.just(Boolean.FALSE);
})
.defaultIfEmpty(Boolean.TRUE);
}
private Key toLockKey(String key) {
return new Key(aerospikeProperties.getNamespace(), aerospikeProperties.getSetName(), key);
}
private Bin toLockBin(String key) {
return new Bin(lockProperties.getBinName(), key);
}
private WritePolicy acquirePolicy() {
WritePolicy putPolicy = new WritePolicy();
putPolicy.recordExistsAction = RecordExistsAction.CREATE_ONLY;
putPolicy.expiration = lockProperties.getExpirationInSeconds();
return putPolicy;
}
private WritePolicy releasePolicy() {
WritePolicy deletePolicy = new WritePolicy();
deletePolicy.generationPolicy = GenerationPolicy.NONE;
return deletePolicy;
}
}
4. Testing
@RunWith(MockitoJUnitRunner.class)
public class DefaultPessimisticLockTest {
private static final String TEST_KEY = "123";
private static final AerospikeException AEROSPIKE_TIMEOUT_EXCEPTION = new AerospikeException(ResultCode.TIMEOUT, "Aerospike timeout");
private static final AerospikeException AERSOPIKE_KEY_EXISTS_EXCEPTION = new AerospikeException(ResultCode.KEY_EXISTS_ERROR, "Key exists");
@Mock
private IAerospikeReactorClient client;
@Spy
private AerospikeProperties aerospikeProperties = new AerospikeProperties();
@Spy
private RetryProperties retryProperties = new RetryProperties();
@Spy
private LockProperties lockProperties = new LockProperties();
@InjectMocks
private DefaultPessimisticLock pessimisticLock;
@Test
public void lockAcquireExceptionIsThrownIfTimeoutReachedAfterRetry() {
Mockito.when(client.put(Mockito.any(WritePolicy.class), Mockito.any(Key.class), Mockito.any(Bin.class)))
.thenReturn(Mono.error(AEROSPIKE_TIMEOUT_EXCEPTION));
StepVerifier.withVirtualTime(() -> pessimisticLock.tryAcquire(TEST_KEY))
.expectSubscription()
.thenAwait(Duration.ofMillis(1001))
.expectError(PessimisticLockAcquireException.class)
.verify();
}
@Test
public void lockIsAcquiredAfterRetryWithExponentialBackOff() {
Key testKey = new Key(aerospikeProperties.getNamespace(), transactionProperties.getSetName(), TEST_KEY);
Mockito.when(client.put(Mockito.any(WritePolicy.class), Mockito.any(Key.class), Mockito.any(Bin.class)))
.thenReturn(Mono.error(AEROSPIKE_TIMEOUT_EXCEPTION))
.thenReturn(Mono.error(AEROSPIKE_TIMEOUT_EXCEPTION))
.thenReturn(Mono.error(AERSOPIKE_KEY_EXISTS_EXCEPTION))
.thenReturn(Mono.just(testKey));
StepVerifier.withVirtualTime(() -> pessimisticLock.tryAcquire(TEST_KEY))
.expectSubscription()
.thenAwait(Duration.ofMillis(50))
.thenAwait(Duration.ofMillis(100))
.thenAwait(Duration.ofMillis(200))
.expectNext(Boolean.TRUE)
.expectComplete()
.verify();
}
@Test
public void lockAcquireExceptionIsThrownIfKeyExistsError() {
Mockito.when(client.put(Mockito.any(WritePolicy.class), Mockito.any(Key.class), Mockito.any(Bin.class)))
.thenReturn(Mono.error(AERSOPIKE_KEY_EXISTS_EXCEPTION));
StepVerifier.withVirtualTime(() -> pessimisticLock.tryAcquire(TEST_KEY))
.expectSubscription()
.thenAwait(Duration.ofMillis(1001))
.expectError(PessimisticLockAcquireException.class)
.verify();
}
@Test
public void lockIsAcquiredIfKeyBecomesAvailable() {
Key testKey = new Key(aerospikeProperties.getNamespace(), transactionProperties.getSetName(), TEST_KEY);
Mockito.when(client.put(Mockito.any(WritePolicy.class), Mockito.any(Key.class), Mockito.any(Bin.class)))
.thenReturn(Mono.error(AERSOPIKE_KEY_EXISTS_EXCEPTION))
.thenReturn(Mono.error(AERSOPIKE_KEY_EXISTS_EXCEPTION))
.thenReturn(Mono.just(testKey));
StepVerifier.withVirtualTime(() -> pessimisticLock.tryAcquire(TEST_KEY))
.expectSubscription()
.thenAwait(Duration.ofMillis(50))
.thenAwait(Duration.ofMillis(100))
.expectNext(Boolean.TRUE)
.expectComplete()
.verify();
}
@Test
public void lockIsAcquiredSuccessfully() {
Key testKey = new Key(aerospikeProperties.getNamespace(), transactionProperties.getSetName(), TEST_KEY);
Mockito.when(client.put(Mockito.any(WritePolicy.class), Mockito.any(Key.class), Mockito.any(Bin.class)))
.thenReturn(Mono.just(testKey));
StepVerifier.withVirtualTime(() -> pessimisticLock.tryAcquire(TEST_KEY))
.expectSubscription()
.expectNext(Boolean.TRUE)
.expectComplete()
.verify();
}
@Test
public void lockIsTreatedAsReleasedIfDoesNotExist() {
Mockito.when(client.delete(Mockito.any(WritePolicy.class), Mockito.any(Key.class)))
.thenReturn(Mono.empty());
StepVerifier.create(pessimisticLock.release(TEST_KEY))
.expectNext(Boolean.TRUE)
.expectComplete()
.verify();
}
@Test
public void lockIsNotReleasedIfExceptionDuringRelease() {
Mockito.when(client.delete(Mockito.any(WritePolicy.class), Mockito.any(Key.class)))
.thenReturn(Mono.error(AEROSPIKE_TIMEOUT_EXCEPTION));
StepVerifier.create(pessimisticLock.release(TEST_KEY))
.expectNext(Boolean.FALSE)
.expectComplete()
.verify();
}
@Test
public void lockIsReleasedSuccessfully() {
Key testKey = new Key(aerospikeProperties.getNamespace(), transactionProperties.getSetName(), TEST_KEY);
Mockito.when(client.delete(Mockito.any(WritePolicy.class), Mockito.any(Key.class)))
.thenReturn(Mono.just(testKey));
StepVerifier.create(pessimisticLock.release(TEST_KEY))
.expectNext(Boolean.TRUE)
.expectComplete()
.verify();
}
}
5. Using with Reactor
We need to emulate try-finally
semantic with Reactor operators. The code below achieves that goal:
@Override
public <T, R> Mono<R> executeWithLock(String key, T data, OperationExecutor<T, R> operationExecutor) {
return pessimisticLockOperations.tryAcquire(key)
.flatMap(lockAcquired -> operationExecutor.execute(data)
.flatMap(operationResult -> pessimisticLock.release(key)
.map(Functions.constant(operationResult)))
.onErrorResume(throwable -> pessimisticLock.release(key)
.map(Functions.constant(operationResult)))
);
}
6. Conclusion
Aerospike doesn’t have built-in mechanism for pessimistic locking. So to achieve required semantic one would need to implement locking directly.
Another trick in the puzzle is try-finally
semantic with Reactor.