Autoscaling consumers

Hello,

I would like to know if there is a good way to trigger auto-scaling (up and down) for consumer depending on the number of messages in queue ?

Thanks

Ping @rophilogene as already discussed :wink:

Hey @Orkin :wave: , thank you for opening this thread - I will come back to you with a complete response within the week :slight_smile:

Hello @rophilogene did you have time to check this point ?

Thanks

Hi @Orkin , here is a script you could use with a Qovery Cronjob at the interval time you want (5 minuted is a good default value).

import requests
import os
import time

# Qovery API configuration
API_BASE_URL = "https://api.qovery.com"
APPLICATION_ID = "your_application_id_here"
API_TOKEN = os.environ.get("QOVERY_API_TOKEN")

# Auto-scaling configuration
MIN_INSTANCES = 1
MAX_INSTANCES = 5
SCALE_UP_THRESHOLD = 100  # Number of messages in queue to trigger scale up
SCALE_DOWN_THRESHOLD = 10  # Number of messages in queue to trigger scale down
MAX_WAIT_TIME = 600  # Maximum time to wait for environment to be in a final state (10 minutes)

def get_queue_size():
    # TODO: Implement this function to get the actual queue size
    # This is a placeholder that returns a random number for demonstration
    import random
    return random.randint(0, 200)

def get_application_settings():
    url = f"{API_BASE_URL}/application/{APPLICATION_ID}"
    headers = {"Authorization": f"Token {API_TOKEN}"}
    
    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        return response.json()
    else:
        print(f"Failed to get application settings. Status code: {response.status_code}")
        return None

def update_instance_count(current_settings, new_min, new_max):
    url = f"{API_BASE_URL}/application/{APPLICATION_ID}"
    headers = {
        "Authorization": f"Token {API_TOKEN}",
        "Content-Type": "application/json"
    }
    
    updated_settings = current_settings.copy()
    updated_settings["min_running_instances"] = new_min
    updated_settings["max_running_instances"] = new_max
    
    response = requests.put(url, headers=headers, json=updated_settings)
    if response.status_code == 200:
        print(f"Updated instance count: min={new_min}, max={new_max}")
        return True
    else:
        print(f"Failed to update instance count. Status code: {response.status_code}")
        return False

def check_environment_status(environment_id):
    url = f"{API_BASE_URL}/environment/{environment_id}/status"
    headers = {"Authorization": f"Token {API_TOKEN}"}
    
    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        status = response.json().get("state")
        return status in ["DEPLOYED", "RESTARTED", "CANCELED"]
    else:
        print(f"Failed to get environment status. Status code: {response.status_code}")
        return False

def wait_for_environment_ready(environment_id):
    start_time = time.time()
    while time.time() - start_time < MAX_WAIT_TIME:
        if check_environment_status(environment_id):
            return True
        time.sleep(10)  # Wait for 10 seconds before checking again
    print("Timeout waiting for environment to be in a final state")
    return False

def redeploy_application():
    url = f"{API_BASE_URL}/application/{APPLICATION_ID}/redeploy"
    headers = {"Authorization": f"Token {API_TOKEN}"}
    
    response = requests.post(url, headers=headers)
    if response.status_code == 200:
        print("Redeployment initiated successfully")
        return True
    else:
        print(f"Failed to initiate redeployment. Status code: {response.status_code}")
        return False

def auto_scale():
    queue_size = get_queue_size()
    print(f"Current queue size: {queue_size}")
    
    current_settings = get_application_settings()
    if not current_settings:
        print("Unable to proceed without current application settings.")
        return
    
    environment_id = current_settings.get("environment", {}).get("id")
    if not environment_id:
        print("Unable to find environment ID.")
        return
    
    current_min = current_settings.get("min_running_instances", MIN_INSTANCES)
    current_max = current_settings.get("max_running_instances", MAX_INSTANCES)
    
    changes_made = False
    if queue_size >= SCALE_UP_THRESHOLD:
        new_min = min(current_min + 1, MAX_INSTANCES)
        new_max = MAX_INSTANCES
        changes_made = update_instance_count(current_settings, new_min, new_max)
    elif queue_size <= SCALE_DOWN_THRESHOLD:
        new_min = MIN_INSTANCES
        new_max = max(current_max - 1, MIN_INSTANCES)
        changes_made = update_instance_count(current_settings, new_min, new_max)
    else:
        print("No scaling action required.")
    
    if changes_made:
        print("Waiting for environment to be in a final state...")
        if wait_for_environment_ready(environment_id):
            print("Environment is ready. Initiating redeployment...")
            redeploy_application()
        else:
            print("Environment did not reach a final state in time. Skipping redeployment.")

if __name__ == "__main__":
    if not API_TOKEN:
        print("Please set the QOVERY_API_TOKEN environment variable")
    else:
        auto_scale()

Let me know if you have any questions :slight_smile: I hope it helps.

cc @ce_gagnaire

1 Like

Hello @Orkin ,

Did you find some time to try this script?

Please let me know if you need any help with this topic.

Regards,
Charles-Edouard

Hello @ce_gagnaire didn’t have time to try :sweat_smile:.

No worries,

I was just checking to see if you needed any help.

Take your time and ping me if you need help.

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.