57
57
import com .nukkitx .protocol .bedrock .data .entity .EntityData ;
58
58
import com .nukkitx .protocol .bedrock .data .entity .EntityFlag ;
59
59
import com .nukkitx .protocol .bedrock .packet .*;
60
+ import io .netty .channel .EventLoop ;
60
61
import it .unimi .dsi .fastutil .ints .*;
61
62
import it .unimi .dsi .fastutil .longs .Long2ObjectMap ;
62
- import it .unimi .dsi .fastutil .longs .Long2ObjectMaps ;
63
63
import it .unimi .dsi .fastutil .longs .Long2ObjectOpenHashMap ;
64
64
import it .unimi .dsi .fastutil .objects .Object2ObjectOpenHashMap ;
65
65
import it .unimi .dsi .fastutil .objects .ObjectIterator ;
102
102
import java .net .InetSocketAddress ;
103
103
import java .nio .charset .StandardCharsets ;
104
104
import java .util .*;
105
- import java .util .concurrent .*;
105
+ import java .util .concurrent .CompletableFuture ;
106
+ import java .util .concurrent .CompletionException ;
107
+ import java .util .concurrent .ScheduledFuture ;
108
+ import java .util .concurrent .TimeUnit ;
106
109
import java .util .concurrent .atomic .AtomicInteger ;
107
110
108
111
@ Getter
109
112
public class GeyserSession implements CommandSender {
110
113
111
114
private final GeyserConnector connector ;
112
115
private final UpstreamSession upstream ;
116
+ /**
117
+ * The loop where all packets and ticking is processed to prevent concurrency issues.
118
+ */
119
+ private final EventLoop eventLoop ;
113
120
private TcpClientSession downstream ;
114
121
@ Setter
115
122
private AuthData authData ;
@@ -158,11 +165,6 @@ public class GeyserSession implements CommandSender {
158
165
@ Getter (AccessLevel .NONE )
159
166
private final AtomicInteger itemNetId = new AtomicInteger (2 );
160
167
161
- @ Getter (AccessLevel .NONE )
162
- private final Object inventoryLock = new Object ();
163
- @ Getter (AccessLevel .NONE )
164
- private CompletableFuture <Void > inventoryFuture ;
165
-
166
168
@ Setter
167
169
private ScheduledFuture <?> craftingGridFuture ;
168
170
@@ -183,8 +185,8 @@ public class GeyserSession implements CommandSender {
183
185
@ Setter
184
186
private ItemMappings itemMappings ;
185
187
186
- private final Map <Vector3i , SkullPlayerEntity > skullCache = new ConcurrentHashMap <>();
187
- private final Long2ObjectMap <ClientboundMapItemDataPacket > storedMaps = Long2ObjectMaps . synchronize ( new Long2ObjectOpenHashMap <>() );
188
+ private final Map <Vector3i , SkullPlayerEntity > skullCache = new Object2ObjectOpenHashMap <>();
189
+ private final Long2ObjectMap <ClientboundMapItemDataPacket > storedMaps = new Long2ObjectOpenHashMap <>();
188
190
189
191
/**
190
192
* Stores the map between Java and Bedrock biome network IDs.
@@ -426,9 +428,10 @@ public class GeyserSession implements CommandSender {
426
428
427
429
private MinecraftProtocol protocol ;
428
430
429
- public GeyserSession (GeyserConnector connector , BedrockServerSession bedrockServerSession ) {
431
+ public GeyserSession (GeyserConnector connector , BedrockServerSession bedrockServerSession , EventLoop eventLoop ) {
430
432
this .connector = connector ;
431
433
this .upstream = new UpstreamSession (bedrockServerSession );
434
+ this .eventLoop = eventLoop ;
432
435
433
436
this .advancementsCache = new AdvancementsCache (this );
434
437
this .bookEditCache = new BookEditCache (this );
@@ -447,7 +450,6 @@ public GeyserSession(GeyserConnector connector, BedrockServerSession bedrockServ
447
450
448
451
this .playerInventory = new PlayerInventory ();
449
452
this .openInventory = null ;
450
- this .inventoryFuture = CompletableFuture .completedFuture (null );
451
453
this .craftingRecipes = new Int2ObjectOpenHashMap <>();
452
454
this .unlockedRecipes = new ObjectOpenHashSet <>();
453
455
this .lastRecipeNetId = new AtomicInteger (1 );
@@ -664,7 +666,7 @@ private void connectDownstream() {
664
666
boolean floodgate = this .remoteAuthType == AuthType .FLOODGATE ;
665
667
666
668
// Start ticking
667
- tickThread = connector . getGeneralThreadPool () .scheduleAtFixedRate (this ::tick , 50 , 50 , TimeUnit .MILLISECONDS );
669
+ tickThread = eventLoop .scheduleAtFixedRate (this ::tick , 50 , 50 , TimeUnit .MILLISECONDS );
668
670
669
671
downstream = new TcpClientSession (this .remoteAddress , this .remotePort , protocol );
670
672
disableSrvResolving ();
@@ -1095,39 +1097,6 @@ private void startGame() {
1095
1097
upstream .sendPacket (startGamePacket );
1096
1098
}
1097
1099
1098
- /**
1099
- * Adds a new inventory task.
1100
- * Inventory tasks are executed one at a time, in order.
1101
- *
1102
- * @param task the task to run
1103
- */
1104
- public void addInventoryTask (Runnable task ) {
1105
- synchronized (inventoryLock ) {
1106
- inventoryFuture = inventoryFuture .thenRun (task ).exceptionally (throwable -> {
1107
- GeyserConnector .getInstance ().getLogger ().error ("Error processing inventory task" , throwable .getCause ());
1108
- return null ;
1109
- });
1110
- }
1111
- }
1112
-
1113
- /**
1114
- * Adds a new inventory task with a delay.
1115
- * The delay is achieved by scheduling with the Geyser general thread pool.
1116
- * Inventory tasks are executed one at a time, in order.
1117
- *
1118
- * @param task the delayed task to run
1119
- * @param delayMillis delay in milliseconds
1120
- */
1121
- public void addInventoryTask (Runnable task , long delayMillis ) {
1122
- synchronized (inventoryLock ) {
1123
- Executor delayedExecutor = command -> GeyserConnector .getInstance ().getGeneralThreadPool ().schedule (command , delayMillis , TimeUnit .MILLISECONDS );
1124
- inventoryFuture = inventoryFuture .thenRunAsync (task , delayedExecutor ).exceptionally (throwable -> {
1125
- GeyserConnector .getInstance ().getLogger ().error ("Error processing inventory task" , throwable .getCause ());
1126
- return null ;
1127
- });
1128
- }
1129
- }
1130
-
1131
1100
/**
1132
1101
* @return the next Bedrock item network ID to use for a new item
1133
1102
*/
@@ -1229,7 +1198,18 @@ public void sendUpstreamPacketImmediately(BedrockPacket packet) {
1229
1198
* @param packet the java edition packet from MCProtocolLib
1230
1199
*/
1231
1200
public void sendDownstreamPacket (Packet packet ) {
1232
- if (downstream != null && (protocol .getSubProtocol ().equals (SubProtocol .GAME ) || packet .getClass () == LoginPluginResponsePacket .class )) {
1201
+ if (!closed && this .downstream != null ) {
1202
+ EventLoop eventLoop = this .downstream .getChannel ().eventLoop ();
1203
+ if (eventLoop .inEventLoop ()) {
1204
+ sendDownstreamPacket0 (packet );
1205
+ } else {
1206
+ eventLoop .execute (() -> sendDownstreamPacket0 (packet ));
1207
+ }
1208
+ }
1209
+ }
1210
+
1211
+ private void sendDownstreamPacket0 (Packet packet ) {
1212
+ if (protocol .getSubProtocol ().equals (SubProtocol .GAME ) || packet .getClass () == LoginPluginResponsePacket .class ) {
1233
1213
downstream .send (packet );
1234
1214
} else {
1235
1215
connector .getLogger ().debug ("Tried to send downstream packet " + packet .getClass ().getSimpleName () + " before connected to the server" );
0 commit comments