Files
pysatcalc/main.py
2025-11-09 08:52:03 +00:00

460 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from collections import defaultdict, deque
from enum import Enum
from math import ceil
from typing import Dict, List, Tuple, Optional, Any
import networkx as nx
from flask import Flask, render_template, request
from pydantic import BaseModel, Field
from plus import Items, Machines, Recipes, Recipe, RawResources
# from vanilla import Items, Machines, Recipes, Recipe, RawResources
from rich import print
import matplotlib.pyplot as plt
app = Flask(__name__)
# Helpers to map item names to safe query parameter keys
def _slugify(name: str) -> str:
s = ''.join(ch.lower() if ch.isalnum() else '_' for ch in name)
# collapse consecutive underscores and trim
out = []
prev_us = False
for ch in s:
if ch == '_':
if not prev_us:
out.append('_')
prev_us = True
else:
out.append(ch)
prev_us = False
return ''.join(out).strip('_')
class Production(BaseModel):
recipe: Recipe
quantity: float
class ProductionChain:
def __init__(self, preferred_recipes: Optional[Dict[Items, Recipe]] = None):
self.item_to_recipies: Dict[Items, list[Recipes]] = defaultdict(list)
for r in Recipes:
for o in r.value.outputs:
self.item_to_recipies[o].append(r.value)
self.recipe_name_to_obj: Dict[str, Recipe] = {}
for r in Recipes:
self.recipe_name_to_obj[r.value.name] = r.value
self.demand: Dict[Items, float] = defaultdict(float)
self.production: Dict[Items, float] = defaultdict(float)
self.raw_resources: Dict[Items, float] = defaultdict(float)
self.production_chain: Dict[str, float] = defaultdict(float)
self.excess: Dict[Items, float] = defaultdict(float)
self.byproduct: Dict[Items, float] = defaultdict(float)
self.preferred_recipes = preferred_recipes or {}
def get_recipe(self, item: Items) -> Optional[Recipe]:
if item in self.preferred_recipes:
return self.preferred_recipes[item]
return self.item_to_recipies.get(item, (None,))[0]
def calculate_excess(self, production: Dict[Items, float], demand: Dict[Items, float], raw_resources: Dict[Items, float]):
excess = defaultdict(float)
for item, quantity in demand.items():
total = production[item] - demand[item]
if total != 0:
if item in RawResources:
raw_resources[item] -= production[item]
else:
excess[item] = total
return excess
def calculate_byproduct(self, production: Dict[Items, float], demand: Dict[Items, float], raw_resources: Dict[Items, float]):
byproduct = defaultdict(float)
for item, quantity in production.items():
if item not in demand:
byproduct[item] = quantity
return byproduct
def compute_chain(self, targets: Dict[Items, float]) -> Any:
if not targets:
return {}
demand = defaultdict(float)
for target in targets:
demand[target] = targets[target]
queue = deque(targets)
production = defaultdict(float)
raw_resources = defaultdict(float)
production_chain = defaultdict(float)
while queue:
item = queue.popleft()
if item == Items.Silica:
print("silica")
recipe = self.get_recipe(item)
if item in RawResources:
raw_resources[item] += demand[item] - raw_resources[item]
continue
if item in production:
if production[item] == demand[item]:
continue
levels = []
for out, quantity in recipe.outputs.items():
if out in demand:
target_quantity = demand[out] - production[out]
levels.append(target_quantity / quantity)
production_level = max(levels)
if production_level == 0:
continue
production_chain[recipe.name] += production_level
if production_chain[recipe.name] < 0:
del(production_chain[recipe.name])
production_level = 0
for out, quantity in recipe.outputs.items():
production[out] += production_level * quantity
for byproduct, quantity in recipe.byproducts.items():
production[byproduct] += production_level * quantity
queue.append(byproduct)
for inp, quantity in recipe.inputs.items():
queue.append(inp)
demand[inp] += production_level * quantity
excess = self.calculate_excess(production, demand, raw_resources)
byproduct =self.calculate_byproduct(production, demand, raw_resources)
print("demand:", demand)
print("production:", production)
print("excess:", excess)
print("byproduct:", byproduct)
print("raw resources:", raw_resources)
print("production chain:", production_chain)
self.demand = demand
self.production = production
self.raw_resources = raw_resources
self.production_chain = production_chain
self.excess = excess
self.byproduct = byproduct
# self.generate_graph(production_chain, production, raw_resources, excess, byproduct)
return production_chain
def generate_graph(self, production_chain, production, raw_resources, excess, byproduct):
nx_graph = nx.DiGraph()
for recipe in production_chain:
nx_graph.add_node(recipe)
for inp in self.recipe_name_to_obj[recipe].inputs.keys():
if inp in RawResources:
nx_graph.add_edge(recipe, inp.name)
continue
input_recipe = self.get_recipe(inp)
nx_graph.add_edge(recipe, input_recipe.name)
nx.draw_spring(nx_graph, with_labels=True, font_weight='bold')
plt.show()
@app.route("/", methods=["GET"])
def index():
# Build selectable items list from Items enum (display names)
item_names = sorted([i.value.name for i in Items])
name_to_item = {i.value.name: i for i in Items}
result = None
error_msgs: List[str] = []
# --------------------
# Parse targets (support multiple products)
# --------------------
# Accept both legacy single `item`/`rate` and indexed `item_1`/`rate_1`, ...
# Determine how many indexed rows we have
indexed_pairs: List[Tuple[str, str]] = []
max_idx = 0
for key in request.args.keys():
if key.startswith("item_"):
try:
idx = int(key.split("_", 1)[1])
max_idx = max(max_idx, idx)
except ValueError:
pass
for i in range(1, max_idx + 1):
indexed_pairs.append((request.args.get(f"item_{i}"), request.args.get(f"rate_{i}")))
# Build targets_ui (for rendering the form) and targets dict (for compute)
targets_ui: List[dict] = []
targets: Dict[Items, float] = defaultdict(float)
def _add_target_row(item_name: Optional[str], rate_str: Optional[str]):
nonlocal targets, targets_ui
if not item_name and not rate_str:
return
item_name = (item_name or "").strip()
rate_val: Optional[float] = None
if rate_str is not None and rate_str != "":
try:
rate_val = float(rate_str)
if rate_val < 0:
raise ValueError
except (TypeError, ValueError):
error_msgs.append(f"Invalid rate for product '{item_name or '(missing)'}'. Enter a non-negative number.")
rate_val = None
if item_name:
targets_ui.append({"item": item_name, "rate": rate_val if rate_val is not None else 0.0})
if rate_val is not None:
item_obj = name_to_item.get(item_name)
if item_obj is None:
error_msgs.append(f"Unknown item '{item_name}'.")
else:
targets[item_obj] += rate_val
elif rate_val is not None:
# Rate without item name just keep it in UI
targets_ui.append({"item": "", "rate": rate_val})
if indexed_pairs:
for item_nm, rate_s in indexed_pairs:
_add_target_row(item_nm, rate_s)
else:
# Legacy single controls
default_item = item_names[0] if item_names else ""
single_item = request.args.get("item") or default_item
single_rate = request.args.get("rate")
_add_target_row(single_item, single_rate)
if not request.args:
# Initial load: ensure at least one default row
targets_ui = [{"item": default_item, "rate": 60.0}]
if default_item:
item_obj0 = name_to_item.get(default_item)
if item_obj0:
targets[item_obj0] += 60.0
# Selected defaults are taken from the first row (for compatibility with existing template pieces)
selected_item = targets_ui[0]["item"] if targets_ui else (item_names[0] if item_names else "")
selected_rate = targets_ui[0]["rate"] if targets_ui else 60.0
# Optional top-level recipe (applies only when exactly one target)
selected_recipe = request.args.get("recipe") or ""
# Parse per-item recipe overrides from query params recipe_for_<slug(item)>
# Build slug -> Items map
slug_to_item: Dict[str, Items] = { _slugify(i.value.name): i for i in Items }
overrides: Dict[Items, str] = {}
for key, value in request.args.items():
if not key.startswith("recipe_for_"):
continue
if value is None or value == "":
continue
slug = key[len("recipe_for_"):]
item_enum = slug_to_item.get(slug)
if not item_enum:
continue
# Validate that the value is a valid recipe option for this item
candidates = []
for r in Recipes:
rec = r.value
if item_enum in rec.outputs:
candidates.append(rec.name)
if value in candidates:
overrides[item_enum] = value
# Candidate recipes for the (first) selected item (single-target convenience)
recipe_options: List[str] = []
item_obj_for_options = name_to_item.get(selected_item) if len(targets) <= 1 else None
if item_obj_for_options is not None:
for r in Recipes:
recipe = r.value
if item_obj_for_options in recipe.outputs:
recipe_options.append(recipe.name)
recipe_options.sort()
# Validate selected_recipe against available options
if selected_recipe not in recipe_options:
selected_recipe = ""
else:
selected_recipe = ""
# Build preferred map merging top-level selection and overrides
preferred: Optional[Dict[Items, str]] = None
if selected_recipe or overrides:
preferred = {}
preferred.update(overrides)
if selected_recipe and item_obj_for_options is not None:
preferred[item_obj_for_options] = selected_recipe
# Compute and also prepare per-item override options based on resulting chain
overrides_ui: List[dict] = []
if targets:
# Translate preferred mapping (Items -> recipe name) into objects and pass to ProductionChain
preferred_recipes_obj: Optional[Dict[Items, Recipe]] = None
if preferred:
name_to_recipe = {r.value.name: r.value for r in Recipes}
preferred_recipes_obj = {}
for itm, rec_name in preferred.items():
rec_obj = name_to_recipe.get(rec_name)
if rec_obj:
preferred_recipes_obj[itm] = rec_obj
prod_chain = ProductionChain(preferred_recipes=preferred_recipes_obj)
prod_chain.compute_chain(targets)
# Build UI-facing structures from ProductionChain state
raw = {itm.value.name: qty for itm, qty in prod_chain.raw_resources.items() if qty > 0}
unused = {itm.value.name: qty for itm, qty in prod_chain.byproduct.items() if qty > 0}
excess = {itm.value.name: qty for itm, qty in prod_chain.excess.items() if qty > 0}
outputs = {itm.value.name: qty for itm, qty in prod_chain.production.items() if qty > 0}
# Steps per recipe
steps = []
for recipe_name, level in prod_chain.production_chain.items():
rec = prod_chain.recipe_name_to_obj.get(recipe_name)
if not rec:
continue
chosen_item = None
if rec.outputs:
demanded = sorted(rec.outputs.keys(), key=lambda it: prod_chain.demand.get(it, 0.0), reverse=True)
chosen_item = demanded[0]
if not chosen_item:
continue
per_building_output = rec.outputs[chosen_item]
buildings_float = float(level)
buildings = ceil(buildings_float) if buildings_float > 0 else 0
utilization = (buildings_float / buildings) if buildings > 0 else 0.0
target_rate_item = buildings_float * per_building_output
# Compute per-step input item rates for this recipe at the computed level
step_inputs = []
for inp_item, qty_per_building in rec.inputs.items():
rate = buildings_float * qty_per_building
step_inputs.append({
"item": inp_item.value.name,
"rate": rate,
})
# Sort inputs by descending rate for a stable display
step_inputs.sort(key=lambda x: x["rate"], reverse=True)
steps.append({
"item": chosen_item.value.name,
"recipe": rec.name,
"building": rec.building.value.name,
"target_rate": target_rate_item,
"per_building_output": per_building_output,
"buildings_float": buildings_float,
"buildings": buildings,
"utilization": utilization,
"inputs": step_inputs,
})
# Build consumers index per item from the steps' inputs
consumers_by_item: Dict[str, List[dict]] = defaultdict(list)
for s in steps:
for inp in s.get("inputs", []):
consumers_by_item[inp["item"]].append({
"recipe": s["recipe"],
"building": s["building"],
"rate": inp["rate"],
})
# Targets by item for final outputs annotation
target_rates_by_item: Dict[str, float] = {}
for itm, qty in targets.items():
if qty > 0:
target_rates_by_item[itm.value.name] = qty
# Attach destinations to each step (who consumes this step's primary output)
for s in steps:
item_name = s["item"]
dests = list(consumers_by_item.get(item_name, []))
# If this item is also a final target, add a synthetic destination
if item_name in target_rates_by_item:
dests.append({
"recipe": "Final output",
"building": "",
"rate": target_rates_by_item[item_name],
})
# Sort destinations by descending rate for display
dests.sort(key=lambda x: x["rate"], reverse=True)
s["destinations"] = dests
# Aggregate targets for display
result_targets = {}
for itm, qty in targets.items():
if qty > 0:
result_targets[itm.value.name] = qty
result = {
"targets": result_targets,
"raw": raw,
"steps": steps,
"outputs": outputs,
"unused": unused,
"excess": excess,
}
# Collect unique output items from steps for override options
unique_items = []
seen = set()
for s in steps:
item_nm = s.get("item")
if item_nm and item_nm not in seen:
seen.add(item_nm)
unique_items.append(item_nm)
for item_nm in unique_items:
item_enum2 = name_to_item.get(item_nm)
if not item_enum2:
continue
candidates = []
for r in Recipes:
rec = r.value
if item_enum2 in rec.outputs:
candidates.append({
"name": rec.name,
"building": rec.building.value.name,
})
if len(candidates) <= 1:
continue
candidates.sort(key=lambda x: (x["name"]))
sel = None
if preferred and item_enum2 in preferred:
sel = preferred[item_enum2]
slug = _slugify(item_nm)
overrides_ui.append({
"item_name": item_nm,
"slug": slug,
"options": candidates,
"selected": sel or "",
})
# Build reset query (clear overrides) while preserving current targets
if indexed_pairs or len(targets_ui) > 1:
# Multi-target
parts = []
for idx, row in enumerate(targets_ui, start=1):
parts.append(f"item_{idx}={row['item']}")
parts.append(f"rate_{idx}={row['rate']}")
reset_query = "?" + "&".join(parts)
else:
reset_query = f"?item={selected_item}&rate={selected_rate}"
if selected_recipe:
reset_query += f"&recipe={selected_recipe}"
# Combine error messages
error = " ".join(error_msgs) if error_msgs else None
return render_template(
"index.html",
items=item_names,
result=result,
error=error,
selected_item=selected_item,
selected_rate=selected_rate,
recipe_options=recipe_options,
selected_recipe=selected_recipe,
overrides_ui=overrides_ui,
reset_query=reset_query,
targets_ui=targets_ui,
)
def create_app():
return app
if __name__ == "__main__":
# For local dev: python main.py
app.run(host="0.0.0.0", port=5000, debug=True)
# prod_chain = ProductionChain()
# prod_chain.compute_chain({Items.CateriumHeatsink: 10.0})