From d778f5579ad62a718172ddd92a3401a651cfa748 Mon Sep 17 00:00:00 2001 From: Patric Stout Date: Sun, 18 Jul 2021 15:58:41 +0200 Subject: [PATCH] Change: store NewGRFs in redis backend in format the Game Coordinator can read (#48) --- master_server/database/redis.py | 60 +++++++++++++++++++++++++++++---- 1 file changed, 53 insertions(+), 7 deletions(-) diff --git a/master_server/database/redis.py b/master_server/database/redis.py index 072dfb0..5a2a7b5 100644 --- a/master_server/database/redis.py +++ b/master_server/database/redis.py @@ -15,7 +15,10 @@ # Servers should announce every 15 minutes, so if we haven't seen a server # after 20 minutes, we can assume it is no longer running. -TTL = 60 * 20 +TTL_SERVER = 60 * 20 +# Give a bit of grace period to forget about NewGRFs, so server restarts don't +# bump the counter. +TTL_NEWGRF = TTL_SERVER + 60 def md5sum(value): @@ -46,11 +49,11 @@ async def check_session_key_token(self, session_key, token): if ms_token != str(token): return False - await self._redis.expire(f"ms-session-key:{session_key}", TTL) + await self._redis.expire(f"ms-session-key:{session_key}", TTL_SERVER) return True async def store_session_key_token(self, session_key, token): - await self._redis.set(f"ms-session-key:{session_key}", token, ex=TTL) + await self._redis.set(f"ms-session-key:{session_key}", token, ex=TTL_SERVER) async def server_online(self, session_key, server_ip, server_port, info): # Don't accept servers with empty revision or name. @@ -58,26 +61,64 @@ async def server_online(self, session_key, server_ip, server_port, info): return False # server_offline() doesn't get the session_key, so we need a reverse lookup. - await self._redis.set(f"ms-session-id:{server_ip}:{server_port}", session_key, ex=TTL) + await self._redis.set(f"ms-session-id:{server_ip}:{server_port}", session_key, ex=TTL_SERVER) # Create a server-id based on the first ip/port we see of this server. # This means the server-id remains mostly stable between restarts. server_id = await self._redis.get(f"ms-server-id:{session_key}") if server_id is None: server_id = _get_server_id(server_ip, server_port) - await self._redis.set(f"ms-server-id:{session_key}", server_id, ex=TTL) + await self._redis.set(f"ms-server-id:{session_key}", server_id, ex=TTL_SERVER) info["game_type"] = 1 # Public info["connection_type"] = 2 # Direct-IP + newgrfs = info["newgrfs"] + del info["newgrfs"] + + # Convert the NewGRF list to an indexed list, as expected by the + # Game Coordinator. + newgrfs_indexed = [] + for newgrf in newgrfs: + newgrf_lookup_str = await self._redis.get(f"gc-newgrf:{newgrf['grfid']}-{newgrf['md5sum']}") + if newgrf_lookup_str is not None: + newgrf_lookup = json.loads(newgrf_lookup_str) + + # Make sure the entry lives a bit longer. + await self._redis.expire(f"gc-newgrf:{newgrf['grfid']}-{newgrf['md5sum']}", TTL_NEWGRF) + newgrfs_indexed.append(newgrf_lookup["index"]) + continue + + newgrf_lookup = { + "index": await self._redis.incr("gc-newgrf-counter"), + "name": None, + } + res = await self._redis.set( + f"gc-newgrf:{newgrf['grfid']}-{newgrf['md5sum']}", json.dumps(newgrf_lookup), nx=True, ex=TTL_NEWGRF + ) + if res is not None: + await self.add_to_stream("newgrf-added", {"index": newgrf_lookup["index"], "newgrf": newgrf}) + newgrfs_indexed.append(newgrf_lookup["index"]) + continue + + # Another instance sneaked in between our get and set, so fetch + # the key again. This time it is guaranteed to exist. + newgrf_lookup_str = await self._redis.get(f"gc-newgrf:{newgrf['grfid']}-{newgrf['md5sum']}") + newgrf_lookup = json.loads(newgrf_lookup_str) + + await self._redis.expire(f"gc-newgrf:{newgrf['grfid']}-{newgrf['md5sum']}", TTL_NEWGRF) + newgrfs_indexed.append(newgrf_lookup["index"]) + # Update the information of this server. - await self._redis.set(f"gc-server:{server_id}", json.dumps(info), ex=TTL) + await self._redis.set(f"gc-server-newgrf:{server_id}", json.dumps(newgrfs_indexed), ex=TTL_SERVER) + await self.add_to_stream("update-newgrf", {"server_id": server_id, "newgrfs_indexed": newgrfs_indexed}) + await self._redis.set(f"gc-server:{server_id}", json.dumps(info), ex=TTL_SERVER) await self.add_to_stream("update", {"server_id": server_id, "info": info}) # Track this IP based on the server_id. type = "ipv6" if isinstance(server_ip, ipaddress.IPv6Address) else "ipv4" res = await self._redis.set( - f"gc-direct-{type}:{server_id}", json.dumps({"ip": str(server_ip), "port": server_port}), ex=TTL + f"gc-direct-{type}:{server_id}", json.dumps({"ip": str(server_ip), "port": server_port}), ex=TTL_SERVER ) if res > 0: await self.add_to_stream( @@ -140,6 +181,11 @@ async def get_server_info_for_web(self, server_id): if info["connection_type"] == 1: # Do not list ConnectionType.ISOLATED servers. return None + newgrfs_indexed_str = await self._redis.get(f"gc-server-newgrf:{server_id}") + newgrfs_indexed = json.loads(newgrfs_indexed_str) + + info["newgrfs"] = newgrfs_indexed + entry = { "info": info, "server_id": server_id,