Source code for kxg.multiplayer

from .errors import *
from .forums import Forum, IdFactory
from .actors import Actor

[docs]class ClientForum(Forum):
[docs] def __init__(self, pipe): super().__init__() self.pipe = pipe self.pipe.lock() from collections import OrderedDict self.actor_id_factory = None self.response_id_factory = IdFactory(0, 1) self.sent_message_cache = OrderedDict()
[docs] def receive_id_from_server(self): """ Listen for an id from the server. Return true if an id has been received and false otherwise. At the beginning of a game, each client receives an IdFactory from the server. This factory are used to give id numbers that are guaranteed to be unique to tokens that created locally. This method checks to see if such a factory has been received. If it hasn't, this method does not block and immediately returns False. If it has, this method returns True after saving the factory internally. At this point it is safe to call Game.start_game(). It is also safe to call this method as many times as you'd like after an id has been received. """ if self.actor_id_factory is not None: return True for message in self.pipe.receive(): if isinstance(message, IdFactory): self.actor_id_factory = message return True return False
[docs] def connect_everyone(self, world, actors): # Make sure this forum has been assigned an id from the server. if self.actor_id_factory is None: raise ApiUsageError("""\ ClientForum wasn't assigned an id number by the server before the game started. In a multiplayer game, each client must receive an id number from the server before the game starts. ServerActor automatically sends this id as soon as Game.start_game() is called on the server, but ClientForum must explicitly receive it by calling receive_id_from_server(). This method is non-blocking, so it has to be called repeatedly until it returns true, indicating that an id was received. """) # Make sure that this forum is only connected to one actor. assert len(actors) == 1 self.actor = actors[0] # Connect the forum, world, and actors as usual. super().connect_everyone(world, actors)
[docs] def execute_message(self, message): # Cache the message and give it an id number the server can reference # in its response. Messages are cached so they can be undone if they # are rejected by the server. The id is necessary so the client forum # (i.e. this object) can associate each response with a cached message. message._set_server_response_id(self.response_id_factory.next()) self.sent_message_cache[message._get_server_response_id()] = message # Relay the message to a ServerActor running on the server to update # the world on all of the other machines playing the game as well. self.pipe.send(message) self.pipe.deliver() # Have the message update the local world like usual. super().execute_message(message)
[docs] def execute_sync(self, message): """ Respond when the server indicates that the client is out of sync. The server can request a sync when this client sends a message that fails the check() on the server. If the reason for the failure isn't very serious, then the server can decide to send it as usual in the interest of a smooth gameplay experience. When this happens, the server sends out an extra response providing the clients with the information they need to resync themselves. """ info("synchronizing message: {message}") # Synchronize the world. with self.world._unlock_temporarily(): message._sync(self.world) self.world._react_to_sync_response(message) # Synchronize the tokens. for actor in self.actors: actor._react_to_sync_response(message)
[docs] def execute_undo(self, message): """ Manage the response when the server rejects a message. An undo is when required this client sends a message that the server refuses to pass on to the other clients playing the game. When this happens, the client must undo the changes that the message made to the world before being sent or crash. Note that unlike sync requests, undo requests are only reported to the client that sent the offending message. """ info("undoing message: {message}") # Roll back changes that the original message made to the world. with self.world._unlock_temporarily(): message._undo(self.world) self.world._react_to_undo_response(message) # Give the actors a chance to react to the error. For example, a # GUI actor might inform the user that there are connectivity # issues and that their last action was countermanded. for actor in self.actors: actor._react_to_undo_response(message)
[docs] def on_start_game(self): serializer = MessageSerializer(self.world) self.pipe.push_serializer(serializer)
[docs] def on_update_game(self): from .messages import Message # An attempt is made to immediately deliver any messages passed into # execute_message(), but sometimes it takes more than one try to send a # message. So in case there are any messages waiting to be sent, the # code below attempts to clear the queue every frame. self.pipe.deliver() # For each message received from the server: for packet in self.pipe.receive(): # If the incoming packet is a message, execute it on this client # and, if necessary, synchronize this client's world with the # server's. Messages that were sent from this client will not # reappear here, so we don't need to worry about double-dipping. if isinstance(packet, Message): info("receiving message: {packet}") super().execute_message(packet) response = packet._get_server_response() if response and response.sync_needed: self.execute_sync(packet) # If the incoming packet is a response to a message sent from this # client, find that message in the "sent message cache" and attach # the response to it. The response is handled in the while loop # below (and not right here) to better handle weird cases that can # occur when several messages are sent between server responses. elif isinstance(packet, ServerResponse): message = self.sent_message_cache[packet.id] message._set_server_response(packet) # Try to clear the sent message cache: while self.sent_message_cache: message = self.sent_message_cache[next(reversed(self.sent_message_cache))] response = message._get_server_response() # Don't handle any response until responses for any messages that # were sent after it have been handled. This keeps the world in a # sane state for every response. if response is None: break # If the server requested that a message sync or undo itself, then # do that. Messages coming from any client may need to be synced, # but messages that need to be undone were sent by this client and # rejected by the server. if response.sync_needed: self.execute_sync(message) if response.undo_needed: self.execute_undo(message) # Now that the message has been fully handled, pop it off the # cache. self.sent_message_cache.popitem()
[docs] def on_finish_game(self): self.pipe.pop_serializer()
[docs] def _assign_id_factories(self): assert self.actor_id_factory is not None, msg("""\ Can't assign id factories without an id number from the server. This should've been caught by ClientActor.connect_everyone().""") return {self.actor: self.actor_id_factory}
[docs]class ServerActor(Actor):
[docs] def __init__(self, pipe): super().__init__() self._disable_forum_observation() self.pipe = pipe self.pipe.lock()
[docs] def send_message(self, message): raise NotImplementedError
[docs] def on_start_game(self, num_players): serializer = MessageSerializer(self.world) self.pipe.push_serializer(serializer)
[docs] def on_update_game(self, dt): from .messages import MessageCheck # For each message received from the connected client: for message in self.pipe.receive(): info("received message: {message}") # Make sure the message wasn't sent by an actor with a different id # than this one. This should absolutely never happen because this # actor gives its id to its client, so if a mismatch is detected # there's probably a bug in the game engine. if not message.was_sent_by(self._id_factory): critical("ignoring message from player {self.id} claiming to be from player {message.sender_id}.") continue # Check the message to make sure it matches the state of the game # world on the server. If the message doesn't pass the check, the # client and server must be out of sync, because the same check was # just passed on the client. response = ServerResponse(message) try: message._check(self.world) except MessageCheck: response.sync_needed = True else: response.sync_needed = False # Decide if it will be enough for the clients to sync themselves, # or if this message shouldn't be relayed at all (and should be # undone on the client that sent it). The message is also given a # chance to store information it can use later to sync the game. if response.sync_needed: response.undo_needed = not message._prepare_sync( self.world, response) # Tell the clients how to treat this message. For the client that # sent the message in the first place, the response is sent on its # own. If a sync or an undo is needed, the client will retrieve # the original message from its cache and use it to reconcile its # world with the server's. Otherwise, the client will just clear # the original message from its cache. For all the other clients, # the response is attached to the message, but only if a sync is # needed (otherwise nothing special needs to be done). self.pipe.send(response) # If the message doesn't have an irreparable sync error, execute it # on the server and relay it to all the other clients. if not response.undo_needed: self._forum.execute_message(message) # Deliver any messages waiting to be sent. This has to be done every # frame because it sometimes takes more than one try to send a message. self.pipe.deliver()
[docs] def on_finish_game(self): self.pipe.pop_serializer()
[docs] def _set_forum(self, forum, id): super()._set_forum(forum, id) self.pipe.send(id) self.pipe.deliver()
[docs] def _relay_message(self, message): """ Relay messages from the forum on the server to the client represented by this actor. """ info("relaying message: {message}") if not message.was_sent_by(self._id_factory): self.pipe.send(message) self.pipe.deliver()
[docs] def _react_to_message(self, message): """ Don't ever change the world in response to a message. This method is defined is called by the game engine to trigger callbacks tied by this actor to particular messages. This is useful for ordinary actors, but remote actors are only meant to shuttle message between clients and should never react to individual messages. """ pass
[docs]class ServerResponse:
[docs] def __init__(self, message): self.id = message._get_server_response_id() self.sync_needed = False self.undo_needed = False
[docs] def __repr__(self): return "{}(sync_needed={}, undo_needed={})".format( self.__class__.__name__, self.sync_needed, self.undo_needed)
[docs]class MessageSerializer: """ Pickle messages before they are sent over the network, and unpickle them when they are received. Tokens that have been added to the world are serialized using their ID, then replaced with the corresponding token from the remote world when the message is deserialized. """
[docs] def __init__(self, world): self.world = world
[docs] def pack(self, message): from pickle import Pickler from io import BytesIO from .tokens import Token from .messages import Message, require_message buffer = BytesIO() delegate = Pickler(buffer) def persistent_id(token): if isinstance(token, Token): assert isinstance(message, Message), msg("""\ Both Message and ServerResponse objects can be serialized, but only Messages can contain tokens.""") assert token.id, msg("""\ Every token should have an id by now. Tokens that are in the world should always have an id, and tokens that are being added to the world should've been assigned an id by Actor.send_message().""") if token in self.world: assert token not in message.tokens_to_add(), msg("""\ Actor.send_message() should've refused to send a message that would add a token that's already in the world.""") return token.id else: assert token in message.tokens_to_add(), msg("""\ Actor.send_message() should've refused to send a message referencing tokens that aren't in the world and aren't being added to the world.""") return None delegate.persistent_id = persistent_id delegate.dump(message) return buffer.getvalue()
[docs] def unpack(self, packet): from pickle import Unpickler from io import BytesIO buffer = BytesIO(packet) delegate = Unpickler(buffer) delegate.persistent_load = lambda id: self.world.get_token(int(id)) return delegate.load()