June 21, 2019

Simulating a cluster of servers - Pedantic Reinforcement Learning (pt 3)

This post is part of a series. If you haven’t read the introduction yet, you might want to go back and read it so you can understand why we are doing this.

Creating the environment - choosing an action space

Like all reinforcement learning environments, we need to figure out our observation space, action space, and reward function. This post will only deal with the action space. There are a lot of ways to model the action space, and for this project, I experimented with a few before settling on the one I wanted. Here are some examples:

  • At each timestep, our agent can either add a server to the cluster, remove a server from the cluster, or do nothing. The agent has no control over which server gets removed.
    • This is the simplest one I could think of, and while it doesn’t let us do things like spin up multiple servers at once, this trained the fastest and had the simplest strategies.
  • At each timestep, our agent can turn on/off any one of our servers, or do nothing. If there are 10 servers, we can model this with 21 actions (2 for each server representing on/off, and 1 for the nothing action).
    • This gives the agent a lot more control since it can decide exactly which server to turn on or off, but takes longer to train. This approach might be better in a more complex environment. In our case, however, each server is pretty much the same and removing a random server is not that much better than removing a specific server.
  • Similar to the last one, our agent can pick a server and decide that it wants to toggle the state of that server (if it’s on, turn it off and vice versa).
    • This produces similar results to the previous one and has the same pitfalls.

This is a really important part of any project. If you don’t pick a good action space, your agent might not be able to learn what you want it to. We are going to use the first one for this project.

Managing servers

Now, we can build out a few helpful abstractions for managing our servers. I decided to have a class called Cluster responsible for managing our servers and a class called Server responsible for simulating a single server.

Whenever I’m reading some new code, I find it really helpful to first look at how it’s used (either through tests or client code) before I see how it’s implemented. Here’s a basic workflow to show how they should work:

# allow up to 10 servers but they can only handle 1 request at a time
cluster = Cluster(max_num_servers = 10, server_capacity = 1)
assert cluster.get_max_num_servers() == 10


# All servers start off so this request will fail
new_requests = [Request(time = 0, duration = 20)]
errors = cluster.handle_new_requests(new_requests)
assert errors == 1 


assert cluster.add_server()

# Subtle point, we allocate max_num_servers, but keep them all off
# "Adding" a server is equivalent to turning one of them on
# We expect that it's marked as turning on but not on yet, since that isn't instantaneous

selected_server = None
for server in cluster.get_servers():
    if server.is_transitioning():
        assert selected_server == None, "Only one server should be turning on"
        selected_server = server
        
# Wait for the server to turn on by advancing time
while selected_server.get_time_to_transition_states() > 0:
    assert selected_server.is_on() == False, "Server doesn't turn on immediately"
    assert selected_server.is_accepting_requests() == False, \
        "Server can't accept requests before it's on"
    cluster.advance_time()
    
assert selected_server.is_transitioning() == False
assert selected_server.is_on() == True
assert selected_server.is_accepting_requests() == True


# Since the capacity is 1, if we give two requests the first will succeed
# and the second will fail. The cluster is also agnostic to time because
# it is only passed requests that are ready
LONG_REQUEST_DURATION = 200
errors = cluster.handle_new_requests([
    Request(time=0, duration=LONG_REQUEST_DURATION), 
    Request(time=0, duration=LONG_REQUEST_DURATION)
])
assert errors == 1 
assert len(selected_server.active_request_durations) == 1
assert selected_server.active_request_durations[0] == LONG_REQUEST_DURATION


# Turn off the server and verify it doesn't turn off until our request finishes
# It should, however, immediately stop allowing requests
assert cluster.remove_server()
assert selected_server.is_on() == True
assert selected_server.is_accepting_requests() == False
assert selected_server.is_transitioning() == True


# Advance time forward until the request is done, asserting the server is on
for _ in range(LONG_REQUEST_DURATION):
    assert selected_server.is_on()
    cluster.advance_time()

    
# Request is done, server should be dead
assert selected_server.is_on() == False
assert len(server.active_request_durations) == 0

That won’t run just yet because we haven’t defined a Cluster or Server (Request was defined in my last post). Let’s start with Cluster and work our way down. In case it wasn’t obvious before, I’m normally a Java developer.

class Cluster:
    def __init__(self, max_num_servers, server_capacity):
        self.max_num_servers = max_num_servers
        self.server_capacity = server_capacity
        self.reset()
            
    def reset(self):
        self.servers = [
            Server(capacity=self.server_capacity) 
                for _ in range(self.max_num_servers)
        ]
    
    # If all servers are already on or currently turning on or off,
    #   this will do nothing.
    def add_server(self):
        for server in self.servers:
            if not server.is_on() and not server.is_transitioning():
                server.turn_on()
                return True
        return False
    
    # This will pick a random server that we can turn off, and turn it off.
    # If all servers are already off or currently turning on or off,
    #   this will do nothing.
    def remove_server(self):
        eligible_servers = []
        for server in self.servers:
            if server.is_on() and not server.is_transitioning():
                eligible_servers.append(server)
                
        if len(eligible_servers) == 0:
            return False
        
        random.choice(eligible_servers).turn_off()
        return True
    
    def advance_time(self):
        for server in self.servers:
            server.advance_time()
    
    # Routes requests to available servers
    # Returns how many couldn't be routed to any server
    def handle_new_requests(self, new_requests):
        for request_index, new_request in enumerate(new_requests):
            success = self._handle_new_request(new_request)
            
            # We are out of free space and the rest of the 
            # new requests will fail            
            if not success:
                return len(new_requests) - request_index
                
        return 0
    
    # Routes new request to the most free server that's accepting requests
    # Returns false if no servers are free
    def _handle_new_request(self, new_request):
        server_index = self._get_server_index_with_most_free_space()
        if server_index >= 0:
            self.servers[server_index].add_request(new_request)
            return True
        else:
            return False

    # Returns -1 if no servers have any free space
    def _get_server_index_with_most_free_space(self):
        max_free_space = 0.0
        max_free_space_idx = -1
        
        for server_index, server in enumerate(self.servers):
            if not server.is_accepting_requests():
                continue

            free_space = server.get_free_space()
            if free_space > max_free_space:
                free_space = max_free_space
                max_free_space_idx = server_index
                
        return max_free_space_idx
        
    def get_servers(self):
        # Normally this is not a great idea since a client can mess with our
        # internal state, but we're the only client so meh
        return self.servers 
    
    def get_max_num_servers(self):
        return self.max_num_servers

Next up is the server:

MIN_SERVER_START_TIME = 10
MAX_SERVER_START_TIME = 50

class Server:
    def __init__(self, capacity):
        self.on = False
        self.accepting_requests = False
        self.capacity = capacity
        
        # For managing turning on/off
        self.time_to_transition_states = 0
        self.transitioning = False
        
        # We really only care about the durations at this point
        self.active_request_durations = np.array([])
    
    def turn_on(self):
        if self.transitioning:
            raise ValueError("cannot turn on when transitioning states")
        
        self.transitioning = True
        self.time_to_transition_states = \
            random.randint(MIN_SERVER_START_TIME, MAX_SERVER_START_TIME)
        
    def turn_off(self):
        if self.transitioning:
            raise ValueError("cannot turn off when transitioning states")
        
        self.transitioning = True
        # Dies immediately when requests are done
        self.time_to_transition_states = 0 
        self.accepting_requests = False
        
    def add_request(self, req):
        if self.get_free_space() <= 0:
            raise ValueError("server is full")
            
        self.active_request_durations = \
            np.append(self.active_request_durations, req.duration)
        
    def get_free_space(self):
        if self.accepting_requests:
            return self.capacity - len(self.active_request_durations)
        else:
            return 0.0
        
    def advance_time(self):
        self._continue_processing_active_requests()
        self._continue_transitioning_if_necessary()
        
    def _continue_transitioning_if_necessary(self):
        if self.transitioning:
            self.time_to_transition_states = \
                max(0, self.time_to_transition_states - 1)
        
        if self._ready_to_finish_transitioning():
            self.on = not self.on
            self.accepting_requests = self.on
            self.transitioning = False
    
    def _ready_to_finish_transitioning(self):
        if not self.transitioning or self.time_to_transition_states > 0:
            return False
        
        # If we are turning off, wait for all active requests to finish
        if self.on:
            return len(self.active_request_durations) == 0
        else:
            return True
    
    def _continue_processing_active_requests(self):
        self.active_request_durations -= 1
        self.active_request_durations = \
            self.active_request_durations[self.active_request_durations > 0]
        
    def is_on(self):
        return self.on
        
    def is_accepting_requests(self):
        return self.accepting_requests
    
    def is_transitioning(self):
        return self.transitioning
    
    def get_time_to_transition_states(self):
        return self.time_to_transition_states

And now we have all the pieces needed to manage our own cluster and simulate that cluster handling requests. If you go back and run the workflow, you won’t get any assertion errors.

In my next post, we will do some very basic feature engineering to create our observation space (aka deciding what our agent will see in order to control this cluster).