Skip to content

Decoupling caching from AsyncMultiMap. #7

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
</parent>

<groupId>io.reactiverse</groupId>
<artifactId>consul-cluster-manager</artifactId>
<artifactId>consul-clust
<version>1.2.1</version>
<name>Vert.x Consul Cluster Manager</name>
<url>https://github.com/reactiverse/consul-cluster-manager</url>
Expand Down
159 changes: 28 additions & 131 deletions src/main/java/io/vertx/spi/cluster/consul/impl/ConsulAsyncMultiMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.vertx.core.Promise;
import io.vertx.core.impl.TaskQueue;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.json.Json;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.spi.cluster.AsyncMultiMap;
Expand All @@ -35,8 +34,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Predicate;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -70,28 +67,12 @@ public class ConsulAsyncMultiMap<K, V> extends ConsulMap<K, V> implements AsyncM

private final TaskQueue taskQueue = new TaskQueue();
private final KeyValueOptions kvOpts;
private final boolean preferConsistency;
/*
* Implementation of local IN-MEMORY multimap cache which is essentially concurrent hash map under the hood.
* Cache is enabled ONLY when {@code preferConsistency} is set to false i.e. availability (better latency) is preferred.
* If cache is enabled:
* Cache read operations happen synchronously by simply reading from {@link java.util.concurrent.ConcurrentHashMap}.
* Cache WRITE operations happen either:
* - through consul watch that monitors the consul kv store for updates (see https://www.consul.io/docs/agent/watches.html).
* - when consul agent acknowledges the success of write operation (local node's data gets immediately cached without even waiting for a watch to take place.)
* Note: local cache updates still might kick in through consul watch in case update succeeded in consul agent but wasn't yet acknowledged back to node. Eventually last write wins.
*/
private ConcurrentMap<K, ChoosableSet<V>> cache;

public ConsulAsyncMultiMap(String name, boolean preferConsistency, ClusterManagerInternalContext appContext) {

public ConsulAsyncMultiMap(String name, ClusterManagerInternalContext appContext) {
super(name, appContext);
this.preferConsistency = preferConsistency;
// options to make entries of this map ephemeral.
this.kvOpts = new KeyValueOptions().setAcquireSession(appContext.getEphemeralSessionId());
if (!preferConsistency) { // if cp is disabled then disable caching.
cache = new ConcurrentHashMap<>();
startListening();
}
}

private static String getRidOfNodeId(String consulKeyPath) {
Expand All @@ -112,7 +93,7 @@ public void remove(K k, V v, Handler<AsyncResult<Boolean>> completionHandler) {
.compose(aVoid -> getAll(keyPathForAllByAddress(k)))
.compose(consulEntries -> {
List<Future> futures = new ArrayList<>();
consulEntries.forEach(consulEntry -> futures.add(delete(consulEntry.getKey(), v, toChoosableSet(consulEntry.getValue()), consulEntry.getNodeId())));
consulEntries.forEach(consulEntry -> futures.add(doDelete(consulEntry.getKey(), v, toChoosableSet(consulEntry.getValue()), consulEntry.getNodeId())));
return CompositeFuture.all(futures).map(compositeFuture -> {
for (int i = 0; i < compositeFuture.size(); i++) {
boolean resAt = compositeFuture.resultAt(i);
Expand All @@ -137,7 +118,7 @@ public void removeAllMatching(Predicate<V> p, Handler<AsyncResult<Void>> complet
consulEntries.forEach(consulEntry ->
consulEntry.getValue().forEach(v -> {
if (p.test(v)) {
futures.add(delete(consulEntry.getKey(), v, toChoosableSet(consulEntry.getValue()), consulEntry.getNodeId()));
futures.add(doDelete(consulEntry.getKey(), v, toChoosableSet(consulEntry.getValue()), consulEntry.getNodeId()));
}
}));
return CompositeFuture.all(futures).compose(compositeFuture -> Future.<Void>succeededFuture());
Expand All @@ -160,63 +141,37 @@ public void get(K k, Handler<AsyncResult<ChoosableIterable<V>>> resultHandler) {
* @param entries holds entries to which the new entries will be added, these entries have to be queried first.
* @return {@link Future}
*/
private Future<Void> doAdd(K k, V v, Set<V> entries) {
return preferConsistency ? nonCacheableAdd(k, entries, v) : cacheableAdd(k, entries, v);
}

private Future<Void> cacheableAdd(K k, Set<V> entries, V sub) {
return nonCacheableAdd(k, entries, sub)
.compose(aVoid -> {
addEntryToCache(k, sub);
return succeededFuture();
});

}

private Future<Void> nonCacheableAdd(K k, Set<V> subs, V sub) {
Set<V> newOne = new HashSet<>(subs);
newOne.add(sub);
Future<Void> doAdd(K k, V v, Set<V> entries) {
Set<V> newOne = new HashSet<>(entries);
newOne.add(v);
return addToConsulKv(k, newOne, appContext.getNodeId())
.compose(aBoolean -> aBoolean ? succeededFuture() : failedFuture(sub.toString() + ": wasn't added to: " + name));
.compose(aBoolean -> aBoolean ? succeededFuture() : failedFuture(v.toString() + ": wasn't added to: " + name));
}

private Future<Boolean> addToConsulKv(K key, Set<V> vs, String nodeId) {
return asFutureString(key, vs, nodeId)
.compose(encodedValue -> putPlainValue(keyPathForAllByAddressAndByNodeId(key, nodeId), encodedValue, kvOpts));
}

/*
* We are wrapping async call into sync and execute it on the taskQueue. This way we maintain the order
* in which "get" tasks are executed.
* If we simply implement this method as : return preferConsistency ? nonCacheableGet(key) : cacheableGet(key);
* then {@link ClusteredEventBusTest.sendNoContext} will fail due to the fact async calls to get subs by key are unordered.
* TODO: Is there any way in vert.x ecosystem to execute tasks on the event loop by not giving up an order ?
/**
* Gets an entry from consul kv store
* @param key represents key of the entry (i.e. event bus address).
* @return set of subscribers.
*/
private Future<ChoosableSet<V>> doGet(K key) {

Future<ChoosableSet<V>> doGet(K key) {
/*
* We are wrapping async call into sync and execute it on the taskQueue. This way we maintain the order
* in which "get" tasks are executed.
* If we simply implement this method as : return preferConsistency ? nonCacheableGet(key) : cacheableGet(key);
* then {@link ClusteredEventBusTest.sendNoContext} will fail due to the fact async calls to get subs by key are unordered.
* TODO: Is there any way in vert.x ecosystem to execute tasks on the event loop by not giving up an order ?
*/
Promise<ChoosableSet<V>> out = Promise.promise();
VertxInternal vertxInternal = (VertxInternal) appContext.getVertx();
vertxInternal.getOrCreateContext().<ChoosableSet<V>>executeBlocking(event -> {
Future<ChoosableSet<V>> future = preferConsistency
? nonCacheableGet(key) : cacheableGet(key);
Future<ChoosableSet<V>> future = getAllByKey(keyPathForAllByAddress(key)).compose(vs -> succeededFuture(toChoosableSet(vs)));
ChoosableSet<V> choosableSet = completeAndGet(future, 5000);
event.complete(choosableSet);
}, taskQueue, res -> out.complete(res.result()));
return out.future();
}

private Future<ChoosableSet<V>> cacheableGet(K key) {
if (cache.containsKey(key)) return succeededFuture(cache.get(key));
else return nonCacheableGet(key)
.compose(vs -> {
addEntriesToCache(key, vs);
return succeededFuture(vs);
});
}

private Future<ChoosableSet<V>> nonCacheableGet(K key) {
return getAllByKey(keyPathForAllByAddress(key)).compose(vs -> succeededFuture(toChoosableSet(vs)));
}

/**
* Deletes then entry from consul kv store.
*
Expand All @@ -226,21 +181,7 @@ private Future<ChoosableSet<V>> nonCacheableGet(K key) {
* @param nodeId represents node id that the entry belongs to.
* @return {@link Future}
*/
private Future<Boolean> delete(K key, V value, ChoosableSet<V> from, String nodeId) {
return preferConsistency ? nonCacheableDelete(key, value, from, nodeId) : cacheableDelete(key, value, from, nodeId);
}

private Future<Boolean> cacheableDelete(K key, V value, ChoosableSet<V> from, String nodeId) {
return nonCacheableDelete(key, value, from, nodeId)
.compose(aBoolean -> {
if (aBoolean) {
removeEntryFromCache(key, value);
}
return succeededFuture(aBoolean);
});
}

private Future<Boolean> nonCacheableDelete(K key, V value, ChoosableSet<V> from, String nodeId) {
Future<Boolean> doDelete(K key, V value, ChoosableSet<V> from, String nodeId) {
if (from.remove(value)) {
if (from.isEmpty()) return deleteValueByKeyPath(keyPathForAllByAddressAndByNodeId(key, nodeId));
else return addToConsulKv(key, toHashSet(from), nodeId);
Expand All @@ -249,6 +190,11 @@ private Future<Boolean> nonCacheableDelete(K key, V value, ChoosableSet<V> from,
}
}

private Future<Boolean> addToConsulKv(K key, Set<V> vs, String nodeId) {
return asFutureString(key, vs, nodeId)
.compose(encodedValue -> putPlainValue(keyPathForAllByAddressAndByNodeId(key, nodeId), encodedValue, kvOpts));
}

/**
* Returns a consul key path used to fetch all entries (all subscribers of all event buses that are registered).
*/
Expand Down Expand Up @@ -322,53 +268,4 @@ private Set<V> toHashSet(ChoosableSet<V> set) {
set.forEach(hashSet::add);
return hashSet;
}

private void addEntryToCache(K key, V value) {
ChoosableSet<V> choosableSet = cache.get(key);
if (choosableSet == null) choosableSet = new ChoosableSet<>(1);
choosableSet.add(value);
cache.put(key, choosableSet);
if (log.isTraceEnabled()) {
log.trace("[" + appContext.getNodeId() + "]" + " Cache: " + name + " after put of " + key + " -> " + value + ": " + Json.encode(cache));
}
}

private void removeEntryFromCache(K key, V value) {
ChoosableSet<V> choosableSet = cache.get(key);
if (choosableSet == null) return;
choosableSet.remove(value);
if (choosableSet.isEmpty()) cache.remove(key);
else cache.put(key, choosableSet);
if (log.isTraceEnabled()) {
log.trace("[" + appContext.getNodeId() + "]" + " Cache: " + name + " after remove of " + key + " -> " + value + ": " + Json.encode(cache));
}
}

private void addEntriesToCache(K key, ChoosableSet<V> values) {
cache.put(key, values);
}

@Override
protected synchronized void entryUpdated(EntryEvent event) {
if (log.isTraceEnabled()) {
log.trace("[" + appContext.getNodeId() + "]" + " Entry: " + event.getEntry().getKey() + " is for " + event.getEventType());
}
ConsulEntry<K, Set<V>> entry;
try {
entry = asConsulEntry(event.getEntry().getValue());
} catch (Exception e) {
log.warn("Failed to decode: " + event.getEntry().getKey() + " -> " + event.getEntry().getValue(), e);
return;
}
switch (event.getEventType()) {
case WRITE:
entry.getValue().forEach(v -> addEntryToCache(entry.getKey(), v));
break;
case REMOVE:
entry.getValue().forEach(v -> removeEntryFromCache(entry.getKey(), v));
break;
default:
break;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Copyright (C) 2019 Roman Levytskyi
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.vertx.spi.cluster.consul.impl;

import io.vertx.core.Future;
import io.vertx.core.json.Json;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;

import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import static io.vertx.core.Future.succeededFuture;
import static io.vertx.spi.cluster.consul.impl.ConversationUtils.asConsulEntry;

/**
* {@link ConsulAsyncMultiMap} implementation with caching capabilities.
* <p>
* Concurrent hash map is used to implement the local IN-MEMORY multimap cache.
* Cache is enabled ONLY when {@code preferConsistency} is set to false i.e. availability (better latency) is preferred.
* If cache is enabled:
* Cache read operations happen synchronously by simply reading from {@link java.util.concurrent.ConcurrentHashMap}.
* Cache WRITE operations happen either:
* - through consul watch that monitors the consul kv store for updates (see https://www.consul.io/docs/agent/watches.html).
* - when consul agent acknowledges the success of write operation (local node's data gets immediately cached without even waiting for a watch to take place.)
* Note: local cache updates still might kick in through consul watch in case update succeeded in consul agent but wasn't yet acknowledged back to node. Eventually last write wins.
*
* @author <a href="mailto:roman.levytskyi.oss@gmail.com">Roman Levytskyi</a>
*/
public class ConsulCacheableAsyncMultiMap<K, V> extends ConsulAsyncMultiMap<K, V> {

private final static Logger log = LoggerFactory.getLogger(ConsulCacheableAsyncMultiMap.class);

private ConcurrentMap<K, ChoosableSet<V>> cache;

public ConsulCacheableAsyncMultiMap(String name, ClusterManagerInternalContext appContext) {
super(name, appContext);
cache = new ConcurrentHashMap<>();
startListening();
}

@Override
Future<Void> doAdd(K k, V v, Set<V> entries) {
return super.doAdd(k, v, entries).compose(aVoid -> {
addEntryToCache(k, v);
return succeededFuture();
});
}

@Override
Future<ChoosableSet<V>> doGet(K key) {
ChoosableSet<V> cachedEntries = cache.get(key);
if (Objects.nonNull(cachedEntries)) {
return succeededFuture(cachedEntries);
} else {
return super.doGet(key).compose(vs -> {
addEntriesToCache(key, vs);
return succeededFuture(vs);
});
}
}

@Override
Future<Boolean> doDelete(K key, V value, ChoosableSet<V> from, String nodeId) {
return super.doDelete(key, value, from, nodeId).compose(isDeleted -> {
if (isDeleted) {
removeEntryFromCache(key, value);
}
return succeededFuture(isDeleted);
});
}

private void addEntryToCache(K key, V value) {
ChoosableSet<V> choosableSet = cache.get(key);
if (choosableSet == null) choosableSet = new ChoosableSet<>(1);
choosableSet.add(value);
cache.put(key, choosableSet);
if (log.isTraceEnabled()) {
log.trace("[" + appContext.getNodeId() + "]" + " Cache: " + name + " after put of " + key + " -> " + value + ": " + Json.encode(cache));
}
}

private void removeEntryFromCache(K key, V value) {
ChoosableSet<V> choosableSet = cache.get(key);
if (choosableSet == null) return;
choosableSet.remove(value);
if (choosableSet.isEmpty()) cache.remove(key);
else cache.put(key, choosableSet);
if (log.isTraceEnabled()) {
log.trace("[" + appContext.getNodeId() + "]" + " Cache: " + name + " after remove of " + key + " -> " + value + ": " + Json.encode(cache));
}
}

private void addEntriesToCache(K key, ChoosableSet<V> values) {
cache.put(key, values);
}

@Override
protected synchronized void entryUpdated(EntryEvent event) {
if (log.isTraceEnabled()) {
log.trace("[" + appContext.getNodeId() + "]" + " Entry: " + event.getEntry().getKey() + " is for " + event.getEventType());
}
ConsulEntry<K, Set<V>> entry;
try {
entry = asConsulEntry(event.getEntry().getValue());
} catch (Exception e) {
log.warn("Failed to decode: " + event.getEntry().getKey() + " -> " + event.getEntry().getValue(), e);
return;
}
switch (event.getEventType()) {
case WRITE:
entry.getValue().forEach(v -> addEntryToCache(entry.getKey(), v));
break;
case REMOVE:
entry.getValue().forEach(v -> removeEntryFromCache(entry.getKey(), v));
break;
default:
break;
}
}
}