Source code for kxg.tokens

#!/usr/bin/env python3

import contextlib

from .errors import *
from .forums import ForumObserver
from .actors import require_actor

[docs]def read_only(method): """ Indicate that a given `Token` method will not change the state of the world, and is therefore safe to call at any time. """ setattr(method, '_kxg_read_only', True) return method
[docs]def watch_token(method): """ Mark a token extension method that should automatically be called when a token method of the same name is called. This decorator must only be used on `TokenExtension` methods, otherwise it will silently do nothing. The reason is that the decorator itself can't do anything but label the given method, because the token to watch isn't known at the time of decoration. The method is actually setup to watch a token in the `TokenExtension` constructor, which searches for the label added here. But other classes won't make this search and will silently do nothing. """ method._kxg_watch_token = True return method
[docs]class TokenSafetyChecks(type): """ Add checks to make sure token methods are being called safely. In order to keep multiplayer games in sync, the world should only be modified at particular times (e.g. token update methods and messages). The purpose of this metaclass is to stop you from accidentally trying to modify the world outside of these defined times, because doing so would lead to subtle synchronization bugs that could be hard to find. The engine indicates when it is safe to modify the world by setting a boolean lock flag in the world. This metaclass adds a bit of logic to non-read-only token methods that makes sure the world is unlocked before continuing. The `read_only` decorator can be used to indicate which methods are read-only, and are therefore excluded from these checks. The checks configured by this metaclass help find bugs, but may also incur unnecessary computational expense once the game has been fully debugged. For this reason, you can skip the checks by invoking python with optimization enabled (i.e. passing -O). """
[docs] def __new__(meta, name, bases, members): if __debug__: meta.add_safety_checks(members) return super().__new__(meta, name, bases, members)
[docs] @classmethod def add_safety_checks(meta, members): """ Iterate through each member of the class being created and add a safety check to every method that isn't marked as read-only. """ for member_name, member_value in members.items(): members[member_name] = meta.add_safety_check( member_name, member_value)
[docs] @staticmethod def add_safety_check(member_name, member_value): """ If the given member is a method that is public (i.e. doesn't start with an underscore) and hasn't been marked as read-only, replace it with a version that will check to make sure the world is locked. This ensures that methods that alter the token are only called from update methods or messages. """ import functools from types import FunctionType # Bail if the given member is read-only, private, or not a method. is_method = isinstance(member_value, FunctionType) is_read_only = hasattr(member_value, '_kxg_read_only') is_private = member_name.startswith('_') if not is_method or is_read_only or is_private: return member_value def safety_checked_method(self, *args, **kwargs): """ Make sure that the token the world is locked before a non-read-only method is called. """ # Because these checks are pretty magical, I want to be really # careful to avoid raising any exceptions other than the check # itself (which comes with a very clear error message). Here, that # means using getattr() to make sure the world attribute actually # exists. For example, there's nothing wrong with the following # code, but it does call a safety-checked method before the world # attribute is defined: # # class MyToken(kxg.Token): # def __init__(self): # self.init_helper() # super().__init__() world = getattr(self, 'world', None) if world and world.is_locked(): nonlocal member_name raise ApiUsageError("""\ attempted unsafe invocation of {self.__class__.__name__}.{member_name}(). This error brings attention to situations that might cause synchronization issues in multiplayer games. The {member_name}() method is not marked as read-only, but was invoked from outside the context of a message. This means that if {member_name}() makes any changes to the world, those changes will not be propagated. If {member_name}() is actually read-only, label it with the @kxg.read_only decorator.""") # After making that check, call the method as usual. return member_value(self, *args, **kwargs) # Preserve any "forum observer" decorations that have been placed on # the method and restore the method's original name and module strings, # to make inspection and debugging a little easier. functools.update_wrapper( safety_checked_method, member_value, assigned=functools.WRAPPER_ASSIGNMENTS + ( '_kxg_subscribe_to_message', '_kxg_subscribe_to_sync_response', '_kxg_subscribe_to_undo_response', ) ) return safety_checked_method
[docs]class TokenExtension(ForumObserver):
[docs] def __init__(self, actor, token): super().__init__() self.actor = actor self.token = token # Iterate through all of the extension methods to find ones wanting to # "watch" the token, then configure the token to call these methods # whenever a token method of the same name is called. from inspect import getmembers, ismethod for method_name, method in getmembers(self, ismethod): # Methods with the '_kxg_watch_token' attribute set should be set # up to watch the token. This attribute is typically set using the # @watch_token decorator. if hasattr(method, '_kxg_watch_token'): token.watch_method(method_name, method)
[docs] def __rshift__(self, message): return self.send_message(message)
[docs] def send_message(self, message): return self.actor.send_message(message)
[docs]class Token(ForumObserver, metaclass=TokenSafetyChecks):
[docs] class WatchedMethod:
[docs] def __init__(self, method): self.method = method self.watchers = []
[docs] def __call__(self, *args, **kwargs): self.method(*args, **kwargs) for watcher in self.watchers: watcher(*args, **kwargs)
[docs] def add_watcher(self, watcher): self.watchers.append(watcher)
[docs] def __init__(self): super().__init__() self._id = None self._world = None self._extensions = {} self._disable_forum_observation()
[docs] def __repr__(self, **kwargs): attrs = {'id': getattr(self, '_id', None), **kwargs} attrs_fmt = ', '.join( '{}={}'.format(k, repr(v)) for k, v in attrs.items() ) return '{}({})'.format(self.__class__.__name__, attrs_fmt)
[docs] def __getstate__(self): state = super().__getstate__() del state['_world'] del state['_extensions'] return state
[docs] def __setstate__(self, state): Token.__init__(self) super().__setstate__(state)
[docs] def __extend__(self): return {}
@property def id(self): return self._id @property def world(self): return self._world @property def has_id(self): return self.id is not None @property def has_world(self): assert (not self.world) or (self in self.world), msg("""\ If a token has a reference to the world, it should be in the world.""") return self.world is not None
[docs] @read_only def has_extension(self, actor): require_actor(actor) return actor in self._extensions
[docs] @read_only def get_extension(self, actor): require_actor(actor) return self._extensions[actor]
[docs] @read_only def get_extensions(self): return list(self._extensions.values())
[docs] @read_only def watch_method(self, method_name, callback): """ Register the given callback to be called whenever the method with the given name is called. You can easily take advantage of this feature in token extensions by using the `watch_token` decorator. """ # Make sure a token method with the given name exists, and complain if # nothing is found. try: method = getattr(self, method_name) except AttributeError: raise ApiUsageError("""\ {self.__class__.__name__} has no such method {method_name}() to watch. This error usually means that you used the @watch_token decorator on a method of a token extension class that didn't match the name of any method in the corresponding token class. Check for typos.""") # Wrap the method in a WatchedMethod object, if that hasn't already # been done. This object manages a list of callback method and takes # responsibility for calling them after the method itself has been # called. if not isinstance(method, Token.WatchedMethod): setattr(self, method_name, Token.WatchedMethod(method)) method = getattr(self, method_name) # Add the given callback to the watched method. method.add_watcher(callback)
[docs] def on_add_to_world(self, world): pass
[docs] def on_update_game(self, dt): pass
[docs] @read_only def on_report_to_referee(self, reporter): pass
[docs] def on_remove_from_world(self): pass
[docs] def _give_id(self, id_factory): require_token(self) from .forums import IdFactory assert isinstance(id_factory, IdFactory), msg("""\ The argument to Token._give_id() should be an IdFactory. This method should also only be called by the game engine itself.""") assert not self.has_id, msg("""\ Can't give {self} and id because it already has one. Actor.send_message() should've refused to send a message that would add a duplicate token to the world.""") self._id = id_factory.next()
[docs] def _check_if_forum_observation_enabled(self): """ Give a helpful error if the user attempts to subscribe or unsubscribe from messages while the token is not registered with a world. This can easily happen if the user attempts to subscribe to messages in the constructor. However, because the constructor is only called on one client and message handlers cannot be pickled, subscribing at this time would create hard-to-find synchronization bugs. """ try: super()._check_if_forum_observation_enabled() except ApiUsageError: raise ApiUsageError("""\ Token {self} can't subscribe to messages now. Tokens must be added to the world before they can subscribe to (or unsubscribe from) messages, because subscriptions can't be pickled and sent over the network. So any subscriptions a token makes while it's not part of the world won't be communicated to each machine playing the game. You are most likely getting this error because you tried to subscribe to messages in the constructor of a Token subclass. You can't do that, but instead you can either make your subscriptions in the on_add_to_world() callback or you can label your handler methods with the @subscribe_to_message decorator.""")
[docs] def _add_to_world(self, world, actors): self._world = world self._enable_forum_observation() self._create_extensions(actors) self.on_add_to_world(world)
[docs] def _create_extensions(self, actors): self._extensions = {} extension_classes = self.__extend__() for actor in actors: actor_class = type(actor) extension_class = extension_classes.get(actor_class) if extension_class: # Raise an easy-to-understand error if the extension class's # constructor takes something other than (self, actor, token). # An error would be raised anyway as soon as we try to # instantiate the extension, but that error would be hard to # understand because it wouldn't contain the name of the # offending extension and would come from pretty deep in the # game engine. from inspect import getfullargspec argspec = getfullargspec(extension_class.__init__) if len(argspec.args) != 3: raise ApiUsageError("""\ the {extension_class.__name__} constructor doesn't take the right arguments. Token extension constructors must take exactly three arguments: self, actor, and token. These are the arguments provided by tokens when they automatically instantiate their extensions. Fix this error by making the {extension_class} constructor compatible with these arguments.""") # Instantiate the extension and store a reference to it. extension = extension_class(actor, self) self._extensions[actor] = extension
[docs] def _remove_from_world(self): """ Clear all the internal data the token needed while it was part of the world. Note that this method doesn't actually remove the token from the world. That's what `World._remove_token` does. This method is just responsible for setting the internal state of the token being removed. """ self.on_remove_from_world() self._extensions = {} self._disable_forum_observation() self._world = None self._id = None
[docs]class World(Token): """ Manage all of the tokens participating in the game. """
[docs] def __init__(self): super().__init__() self._id = 0 self._tokens = {} self._actors = [] self._is_locked = True self._has_game_ended = False with self._unlock_temporarily(): self._add_token(self)
[docs] def __repr__(self): return '{}()'.format(self.__class__.__name__)
[docs] def __iter__(self): # Make a copy of self._tokens.values() because it's possible for tokens # to be added or removed from the world while the world is being # iterated through. Concretely, this can happen when a token extension # sends a message to add or remove a token during on_update_game(). return (x for x in list(self._tokens.values()) if x is not self)
[docs] def __len__(self): return len(self._tokens)
[docs] def __contains__(self, token_or_id): id = token_or_id.id if isinstance(token_or_id, Token) else token_or_id return id in self._tokens
[docs] def __getstate__(self): raise ApiUsageError("""\ can't pickle the world. The world should never have to be pickled and sent over the network, because each machine starts with its own world and is kept in sync by the messaging system. But unless you are explicitly trying to pickle the world on your own, this error is more likely to be the symptom of a major bug in the messaging system that is preventing it from correctly deciding which tokens need to be pickled.""")
[docs] def __setstate__(self, state): raise AssertionError("""\ World.__getstate__ should've refused to pickle the world.""")
[docs] @read_only def get_token(self, id): """ Return the token with the given id. If no token with the given id is registered to the world, an `IndexError` is raised. """ return self._tokens[id]
[docs] @read_only def get_last_id(self): """ Return the largest token id registered with the world. If no tokens have been added to the world, the id for the world itself (0) is returned. This means that the first "real" token id is 1. """ return max(self._tokens)
[docs] @read_only def is_locked(self): """ Return True if the world is currently allowed to be modified. """ return self._is_locked
[docs] def end_game(self): self._has_game_ended = True
[docs] @read_only def has_game_ended(self): """ Return True if the game has ended. """ return self._has_game_ended
[docs] def on_start_game(self): pass
[docs] def on_update_game(self, dt): for token in self: token.on_update_game(dt)
[docs] def on_finish_game(self): pass
[docs] @contextlib.contextmanager def _unlock_temporarily(self): """ Allow tokens to modify the world for the duration of a with-block. It's important that tokens only modify the world at appropriate times, otherwise the changes they make may not be communicated across the network to other clients. To help catch and prevent these kinds of errors, the game engine keeps the world locked most of the time and only briefly unlocks it (using this method) when tokens are allowed to make changes. When the world is locked, token methods that aren't marked as being read-only can't be called. When the world is unlocked, any token method can be called. These checks can be disabled by running python with optimization enabled. You should never call this method manually from within your own game. This method is intended to be used by the game engine, which was carefully designed to allow the world to be modified only when safe. Calling this method yourself disables an important safety check. """ if not self._is_locked: yield else: try: self._is_locked = False yield finally: self._is_locked = True
[docs] def _add_token(self, token): require_token(token) assert token.has_id, msg("""\ token {token} should've been assigned an id by Message._assign_token_ids() before World._add_token() was called.""") assert token not in self, msg("""\ Message._assign_token_ids() should've refused to process a token that was already in the world.""") info('adding token to world: {token}') # Add the token to the world. self._tokens[token.id] = token token._add_to_world(self, self._actors) return token
[docs] def _remove_token(self, token): require_active_token(token) info('removing token from world: {token}') id = token.id token._remove_from_world() del self._tokens[id]
[docs] def _get_nested_observers(self): return iter(self)
[docs] def _set_actors(self, actors): """ Tell the world which actors are running on this machine. This information is used to create extensions for new tokens. """ self._actors = actors
[docs]@debug_only def require_token(object): """ Raise an `ApiUsageError` if the given object is not a fully constructed instance of a `Token` subclass. """ require_instance(Token(), object)
[docs]@debug_only def require_active_token(object): """ Raise an `ApiUsageError` if the given object is not a token that is currently participating in the game. To be participating in the game, the given token must have an id number and be associated with the world. """ require_token(object) token = object if not token.has_id: raise ApiUsageError("""\ token {token} should have an id, but doesn't. This error usually means that a token was added to the world without being assigned an id number. To correct this, make sure that you're using a message (i.e. CreateToken) to create all of your tokens.""") if not token.has_world: raise ApiUsageError("""\ token {token} (id={token.id}) not in world. You can get this error if you try to remove the same token from the world twice, e.g. if you don't get rid of every reference to a token after it's removed the first time, then later on try to remove the stale reference.""")
[docs]@debug_only def require_world(object): """ Raise an `ApiUsageError` if the given object is not a fully constructed `World` instance. """ return require_instance(World(), object)