Skip to content

Commit

Permalink
feat: various logical improvements to data syncing
Browse files Browse the repository at this point in the history
* Update data in Redis during world saves & commands
* Always set data time to 1 year regardless of sync mode
  • Loading branch information
WiIIiam278 committed Mar 7, 2025
1 parent e56041e commit 305f90f
Show file tree
Hide file tree
Showing 9 changed files with 25 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import net.william278.husksync.HuskSync;
import net.william278.husksync.data.Data;
import net.william278.husksync.data.DataSnapshot;
import net.william278.husksync.redis.RedisKeyType;
import net.william278.husksync.redis.RedisManager;
import net.william278.husksync.user.OnlineUser;
import net.william278.husksync.user.User;
Expand Down Expand Up @@ -84,18 +83,17 @@ private void updateItems(@NotNull OnlineUser viewer, @NotNull Data.Items.Items i

// Create and pack the snapshot with the updated enderChest
final DataSnapshot.Packed snapshot = latestData.get().copy();
boolean pin = plugin.getSettings().getSynchronization().doAutoPin(DataSnapshot.SaveCause.ENDERCHEST_COMMAND);
snapshot.edit(plugin, (data) -> {
data.getEnderChest().ifPresent(enderChest -> enderChest.setContents(items));
data.setSaveCause(DataSnapshot.SaveCause.ENDERCHEST_COMMAND);
data.setPinned(
plugin.getSettings().getSynchronization().doAutoPin(DataSnapshot.SaveCause.ENDERCHEST_COMMAND)
);
data.setPinned(pin);
});

// Save data
final RedisManager redis = plugin.getRedisManager();
plugin.getDataSyncer().saveData(holder, snapshot, (user, data) -> {
redis.getUserData(user).ifPresent(d -> redis.setUserData(user, snapshot, RedisKeyType.TTL_1_YEAR));
redis.getUserData(user).ifPresent(d -> redis.setUserData(user, snapshot));
redis.sendUserDataUpdate(user, data);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import net.william278.husksync.HuskSync;
import net.william278.husksync.data.Data;
import net.william278.husksync.data.DataSnapshot;
import net.william278.husksync.redis.RedisKeyType;
import net.william278.husksync.redis.RedisManager;
import net.william278.husksync.user.OnlineUser;
import net.william278.husksync.user.User;
Expand Down Expand Up @@ -85,18 +84,17 @@ private void updateItems(@NotNull OnlineUser viewer, @NotNull Data.Items.Items i

// Create and pack the snapshot with the updated inventory
final DataSnapshot.Packed snapshot = latestData.get().copy();
boolean pin = plugin.getSettings().getSynchronization().doAutoPin(DataSnapshot.SaveCause.INVENTORY_COMMAND);
snapshot.edit(plugin, (data) -> {
data.getInventory().ifPresent(inventory -> inventory.setContents(items));
data.setSaveCause(DataSnapshot.SaveCause.INVENTORY_COMMAND);
data.setPinned(
plugin.getSettings().getSynchronization().doAutoPin(DataSnapshot.SaveCause.INVENTORY_COMMAND)
);
data.setPinned(pin);
});

// Save data
final RedisManager redis = plugin.getRedisManager();
plugin.getDataSyncer().saveData(holder, snapshot, (user, data) -> {
redis.getUserData(user).ifPresent(d -> redis.setUserData(user, snapshot, RedisKeyType.TTL_1_YEAR));
redis.getUserData(user).ifPresent(d -> redis.setUserData(user, snapshot));
redis.sendUserDataUpdate(user, data);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ protected CommandUser adapt(net.william278.uniform.CommandUser user) {
}

@NotNull
@SuppressWarnings("SameParameterValue")
protected <S> ArgumentElement<S, OnlineUser> onlineUser(@NotNull String name) {
return new ArgumentElement<>(name, reader -> {
final String username = reader.readString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.mojang.brigadier.exceptions.CommandSyntaxException;
import net.william278.husksync.HuskSync;
import net.william278.husksync.data.DataSnapshot;
import net.william278.husksync.redis.RedisKeyType;
import net.william278.husksync.redis.RedisManager;
import net.william278.husksync.user.CommandUser;
import net.william278.husksync.user.OnlineUser;
Expand Down Expand Up @@ -154,7 +153,7 @@ private void restoreSnapshot(@NotNull CommandUser executor, @NotNull User user,
// Save data
final RedisManager redis = plugin.getRedisManager();
plugin.getDataSyncer().saveData(user, data, (u, s) -> {
redis.getUserData(u).ifPresent(d -> redis.setUserData(u, s, RedisKeyType.TTL_1_YEAR));
redis.getUserData(u).ifPresent(d -> redis.setUserData(u, s));
redis.sendUserDataUpdate(u, s);
plugin.getLocales().getLocale("data_restored", u.getName(), u.getUuid().toString(),
s.getShortId(), s.getId().toString()).ifPresent(executor::sendMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ protected final void saveOnWorldSave(@NotNull List<OnlineUser> usersInWorld) {
}
usersInWorld.stream()
.filter(user -> !plugin.isLocked(user.getUuid()) && !user.isNpc())
.forEach(user -> plugin.getDataSyncer().saveData(
user, user.createSnapshot(DataSnapshot.SaveCause.WORLD_SAVE)
.forEach(user -> plugin.getDataSyncer().saveCurrentUserData(
user, DataSnapshot.SaveCause.WORLD_SAVE
));
}

Expand All @@ -94,12 +94,13 @@ protected final void saveOnWorldSave(@NotNull List<OnlineUser> usersInWorld) {
protected void saveOnPlayerDeath(@NotNull OnlineUser user, @NotNull Data.Items items) {
final SaveOnDeathSettings settings = plugin.getSettings().getSynchronization().getSaveOnDeath();
if (plugin.isDisabling() || !settings.isEnabled() || plugin.isLocked(user.getUuid())
|| user.isNpc() || (!settings.isSaveEmptyItems() && items.isEmpty())) {
|| user.isNpc() || (!settings.isSaveEmptyItems() && items.isEmpty())) {
return;
}

// We don't persist this to Redis for syncing, as this snapshot is from a state they won't be in post-respawn
final DataSnapshot.Packed snapshot = user.createSnapshot(DataSnapshot.SaveCause.DEATH);
snapshot.edit(plugin, (data -> data.getInventory().ifPresent(inventory -> inventory.setContents(items))));
snapshot.edit(plugin, (data -> data.getInventory().ifPresent(inv -> inv.setContents(items))));
plugin.getDataSyncer().saveData(user, snapshot);
}

Expand All @@ -108,16 +109,12 @@ protected void saveOnPlayerDeath(@NotNull OnlineUser user, @NotNull Data.Items i
* Handle the plugin disabling
*/
public void handlePluginDisable() {
// Save for all online players
// Save for all online players.
plugin.getOnlineUsers().stream()
.filter(user -> !plugin.isLocked(user.getUuid()) && !user.isNpc())
.forEach(user -> {
plugin.lockPlayer(user.getUuid());
plugin.getDataSyncer().saveData(
user,
user.createSnapshot(DataSnapshot.SaveCause.SERVER_SHUTDOWN),
(saved, data) -> plugin.getRedisManager().clearUserData(saved)
);
plugin.getDataSyncer().saveCurrentUserData(user, DataSnapshot.SaveCause.SERVER_SHUTDOWN);
});

// Close outstanding connections
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void initialize() throws IllegalStateException {
jedisPool.getResource().ping();
} catch (JedisException e) {
throw new IllegalStateException("Failed to establish connection with Redis. "
+ "Please check the supplied credentials in the config file", e);
+ "Please check the supplied credentials in the config file", e);
}

// Subscribe using a thread (rather than a task)
Expand Down Expand Up @@ -213,6 +213,8 @@ protected void sendMessage(@NotNull String channel, @NotNull String message) {

public void sendUserDataUpdate(@NotNull User user, @NotNull DataSnapshot.Packed data) {
plugin.runAsync(() -> {
this.setUserData(user, data);

final RedisMessage redisMessage = RedisMessage.create(user.getUuid(), data.asBytes(plugin));
redisMessage.dispatch(plugin, RedisMessage.Type.UPDATE_USER_DATA);
});
Expand All @@ -226,6 +228,7 @@ public CompletableFuture<Optional<DataSnapshot.Packed>> getUserData(@NotNull UUI
.orElse(this.requestData(requestId, user));
}

// Request a user's dat x-server
private CompletableFuture<Optional<DataSnapshot.Packed>> requestData(@NotNull UUID requestId, @NotNull User user) {
final CompletableFuture<Optional<DataSnapshot.Packed>> future = new CompletableFuture<>();
pendingRequests.put(requestId, future);
Expand All @@ -247,19 +250,13 @@ private CompletableFuture<Optional<DataSnapshot.Packed>> requestData(@NotNull UU
});
}

/**
* Set a user's data to Redis
*
* @param user the user to set data for
* @param data the user's data to set
* @param timeToLive The time to cache the data for
*/
// Set a user's data to Redis
@Blocking
public void setUserData(@NotNull User user, @NotNull DataSnapshot.Packed data, int timeToLive) {
public void setUserData(@NotNull User user, @NotNull DataSnapshot.Packed data) {
try (Jedis jedis = jedisPool.getResource()) {
jedis.setex(
getKey(RedisKeyType.LATEST_SNAPSHOT, user.getUuid(), clusterId),
timeToLive,
RedisKeyType.TTL_1_YEAR,
data.asBytes(plugin)
);
plugin.debug(String.format("[%s] Set %s key on Redis", user.getName(), RedisKeyType.LATEST_SNAPSHOT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import net.william278.husksync.api.HuskSyncAPI;
import net.william278.husksync.data.DataSnapshot;
import net.william278.husksync.database.Database;
import net.william278.husksync.redis.RedisKeyType;
import net.william278.husksync.redis.RedisManager;
import net.william278.husksync.user.OnlineUser;
import net.william278.husksync.user.User;
Expand Down Expand Up @@ -104,7 +103,7 @@ public void terminate() {
public void saveCurrentUserData(@NotNull OnlineUser onlineUser, @NotNull DataSnapshot.SaveCause cause) {
this.saveData(
onlineUser, onlineUser.createSnapshot(cause),
(user, data) -> getRedis().setUserData(user, data, RedisKeyType.TTL_10_SECONDS)
(user, data) -> getRedis().setUserData(user, data)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import net.william278.husksync.HuskSync;
import net.william278.husksync.data.DataSnapshot;
import net.william278.husksync.redis.RedisKeyType;
import net.william278.husksync.user.OnlineUser;
import org.jetbrains.annotations.NotNull;

Expand Down Expand Up @@ -63,7 +62,7 @@ public void syncSaveUserData(@NotNull OnlineUser onlineUser) {
getRedis().setUserServerSwitch(onlineUser);
saveData(
onlineUser, onlineUser.createSnapshot(DataSnapshot.SaveCause.DISCONNECT),
(user, data) -> getRedis().setUserData(user, data, RedisKeyType.TTL_10_SECONDS)
(user, data) -> getRedis().setUserData(user, data)
);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import net.william278.husksync.HuskSync;
import net.william278.husksync.data.DataSnapshot;
import net.william278.husksync.redis.RedisKeyType;
import net.william278.husksync.user.OnlineUser;
import org.jetbrains.annotations.NotNull;

Expand Down Expand Up @@ -62,7 +61,7 @@ public void syncSaveUserData(@NotNull OnlineUser onlineUser) {
plugin.runAsync(() -> saveData(
onlineUser, onlineUser.createSnapshot(DataSnapshot.SaveCause.DISCONNECT),
(user, data) -> {
getRedis().setUserData(user, data, RedisKeyType.TTL_1_YEAR);
getRedis().setUserData(user, data);
getRedis().setUserCheckedOut(user, false);
}
));
Expand Down

0 comments on commit 305f90f

Please sign in to comment.