This commit is contained in:
2025-11-12 17:18:16 +00:00
parent 9f4ff856e6
commit 67ade78747
3 changed files with 652 additions and 61 deletions

444
main.py
View File

@@ -1,11 +1,11 @@
from collections import defaultdict, deque
from enum import Enum
from math import ceil
from typing import Dict, List, Tuple, Optional, Any, Deque
from typing import Dict, List, Tuple, Optional, Any, Deque, Annotated
import networkx as nx
from flask import Flask, render_template, request
from pydantic import BaseModel, Field
from plus import Items, Machines, Recipes, Recipe, RawResources
from flask import Flask, render_template, request, jsonify
from pydantic import BaseModel, Field, computed_field, BeforeValidator
from plus import Items, Machines, Recipes, Recipe, RawResources, MachineOrder
# from vanilla import Items, Machines, Recipes, Recipe, RawResources
from rich import print
import matplotlib.pyplot as plt
@@ -29,46 +29,69 @@ def _slugify(name: str) -> str:
prev_us = False
return ''.join(out).strip('_')
class Production(BaseModel):
recipe: Recipe
production_level: float
ingress: Dict[Items, "ProductionLink"] = Field(default_factory=dict)
egress: Dict[Items, "ProductionLink"] = Field(default_factory=dict)
@property
def inputs(self):
return {
item: quantity*self.production_level for item, quantity in self.recipe.inputs.items()
}
@property
def outputs(self):
return {
item: quantity*self.production_level for item, quantity in self.recipe.outputs.items()
}
def demand_satisfied(self):
if self.production_level <= 0:
return True
demand = self.recipe.inputs
for item in self.ingress:
demand[item] -= self.ingress[item].quantity
return all(demand[item] <= 0 for item in demand)
def remaining_output(self):
outputs = self.outputs
for item in self.egress:
outputs[item] -= self.egress[item].quantity
return outputs
class ProductionLink(BaseModel):
item: Items
quantity: float
production: Optional[Production]
raw: bool = False
def positive_float(number: float) -> float:
return max(0.0, number)
class Production(BaseModel):
recipe: Recipe
ingress: Dict[Items, List[ProductionLink]] = Field(default_factory=lambda : defaultdict(list))
egress: Dict[Items, List[ProductionLink]] = Field(
default_factory=lambda: defaultdict(list)
)
demand: Dict[Items, Annotated[float, BeforeValidator(positive_float)]] = Field(default_factory=lambda: defaultdict(float))
@computed_field
@property
def production_level(self) -> float:
levels = []
for item, quantity in self.demand.items():
levels.append(quantity/(self.recipe.outputs|self.recipe.byproducts)[item])
return max(levels) if levels else 0.0
@computed_field
@property
def inputs(self) -> Dict[Items, float]:
return {
item: quantity*self.production_level for item, quantity in self.recipe.inputs.items()
}
@computed_field
@property
def outputs(self) -> Dict[Items, float]:
return {
item: quantity*self.production_level for item, quantity in self.recipe.outputs.items() | self.recipe.byproducts.items()
}
def demand_satisfied(self):
if self.production_level <= 0:
return True
current_demand = self.demand.copy()
for item in self.ingress:
for link in self.ingress[item]:
current_demand[item] -= link.quantity
return all(current_demand[item] <= 0 for item in current_demand)
def remaining_output(self):
outputs = self.outputs
for item in self.egress:
for link in self.egress[item]:
outputs[item] -= link.quantity
return outputs
def remaining_input(self):
inputs = self.inputs
for item in self.ingress:
for link in self.ingress[item]:
inputs[item] -= link.quantity
return inputs
class ProductionChain:
@@ -188,9 +211,10 @@ class ProductionChain:
queue: Deque[Items] = deque()
for target in targets:
recipe = self.get_recipe(target)
p = Production(recipe=recipe, production_level=targets[target]/ recipe.outputs[target])
for inp in p.inputs:
queue.append(inp)
p = Production(recipe=recipe)
p.demand[target] = targets[target]
p.egress[target].append(ProductionLink(item=target, quantity=targets[target], raw=True, production=None))
queue.extend(p.inputs)
for output in p.outputs:
item_to_production[output].append(p)
while queue:
@@ -198,29 +222,68 @@ class ProductionChain:
if item in item_to_production or item in RawResources:
continue
recipe = self.get_recipe(item)
p = Production(recipe=recipe, production_level=0)
p = Production(recipe=recipe)
for output in p.outputs:
item_to_production[output].append(p)
for inp in p.inputs:
queue.append(inp)
queue.extend(p.inputs)
calc_queue: Deque[Production] = deque()
for item in item_to_production:
for production in item_to_production[item]:
if not production.demand_satisfied():
calc_queue.append(production)
print(calc_queue)
while calc_queue:
prod = calc_queue.popleft()
if prod.demand_satisfied():
continue
for inp in prod.inputs:
if inp in RawResources:
prod.ingress.append(ProductionLink(item=inp, quantity=prod.recipe.inputs[inp] - prod..raw_resources[inp], production=None, raw=True))
if prod.remaining_input()[inp] <= 0:
continue
prod.ingress[inp].append(ProductionLink(item=inp, quantity=prod.remaining_input()[inp], production=None, raw=True))
else:
for p in item_to_production[inp]:
if p.demand_satisfied():
prod.production_level += p.production_level * p.recipe.outputs[inp] / p.recipe.inputs[inp]
prod_list = item_to_production[inp]
prod_source = None
if len(prod_list) == 1:
prod_source = prod_list[0]
else:
for prod_list_item in prod_list:
if self.get_recipe(inp) == prod_list_item.recipe:
prod_source = prod_list_item
break
if not prod_source:
continue
current_quantity = prod_source.remaining_output()[inp]
target_quantity = prod.remaining_input()[inp]
if current_quantity >= target_quantity:
prod.ingress[inp].append(
ProductionLink(
item=inp,
quantity=target_quantity,
production=prod_source,
raw=False,
)
)
prod_source.egress[inp].append(
ProductionLink(
item=inp,
quantity=target_quantity,
production=prod,
raw=False,
)
)
else:
pass
if target_quantity > 0:
prod_source.demand[inp] += target_quantity
prod.ingress[inp].append(ProductionLink(item=inp, quantity=target_quantity, production=prod_source, raw=False))
prod_source.egress[inp].append(ProductionLink(item=inp, quantity=target_quantity, production=prod, raw=False))
calc_queue.append(prod_source)
return item_to_production
# print("item_to_production:", item_to_production)
@@ -710,13 +773,290 @@ def index():
)
@app.route("/new", methods=["GET"])
def new():
# Render a page similar to index but powered by ProductionChain.build_chain
item_names = sorted([i.value.name for i in Items])
name_to_item = {i.value.name: i for i in Items}
# Parse multiple targets (item_1/rate_1, ...). Fallback to legacy item/rate
indexed_pairs: List[Tuple[Optional[str], Optional[str]]] = []
max_idx = 0
for key in request.args.keys():
if key.startswith("item_"):
try:
idx = int(key.split("_", 1)[1])
max_idx = max(max_idx, idx)
except ValueError:
pass
for i in range(1, max_idx + 1):
indexed_pairs.append((request.args.get(f"item_{i}"), request.args.get(f"rate_{i}")))
targets: Dict[Items, float] = defaultdict(float)
targets_ui: List[dict] = []
errors: List[str] = []
def _add_row(item_name: Optional[str], rate_str: Optional[str]):
nonlocal targets, targets_ui
if not item_name and not rate_str:
return
item_name = (item_name or "").strip()
rate_val: Optional[float] = None
if rate_str is not None and rate_str != "":
try:
rate_val = float(rate_str)
if rate_val < 0:
raise ValueError
except (TypeError, ValueError):
errors.append(f"Invalid rate for product '{item_name or '(missing)'}'. Enter a non-negative number.")
rate_val = None
if item_name:
targets_ui.append({"item": item_name, "rate": rate_val if rate_val is not None else 0.0})
if rate_val is not None:
item_obj = name_to_item.get(item_name)
if item_obj is None:
errors.append(f"Unknown item '{item_name}'.")
else:
targets[item_obj] += rate_val
elif rate_val is not None:
# Rate without item name keep in UI only
targets_ui.append({"item": "", "rate": rate_val})
if indexed_pairs:
for item_nm, rate_s in indexed_pairs:
_add_row(item_nm, rate_s)
else:
# Legacy single controls
default_item = item_names[0] if item_names else ""
single_item = request.args.get("item") or default_item
single_rate = request.args.get("rate")
_add_row(single_item, single_rate)
if not request.args:
# Initial load: ensure default row
targets_ui = [{"item": default_item, "rate": 60.0}]
if default_item:
item_obj0 = name_to_item.get(default_item)
if item_obj0:
targets[item_obj0] += 60.0
# Build reset query preserving current targets
if indexed_pairs or len(targets_ui) > 1:
parts = []
for idx, row in enumerate(targets_ui, start=1):
parts.append(f"item_{idx}={row['item']}")
parts.append(f"rate_{idx}={row['rate']}")
reset_query = "?" + "&".join(parts)
else:
selected_item = targets_ui[0]["item"] if targets_ui else (item_names[0] if item_names else "")
selected_rate = targets_ui[0]["rate"] if targets_ui else 60.0
reset_query = f"?item={selected_item}&rate={selected_rate}"
error = " ".join(errors) if errors else None
# Parse per-item recipe overrides from query params: recipe_for_<slug(item name)>
# Build slug -> Items map and recipe name -> Recipe map
slug_to_item: Dict[str, Items] = { _slugify(i.value.name): i for i in Items }
recipe_name_to_obj: Dict[str, Recipe] = {}
for r in Recipes:
recipe_name_to_obj[r.value.name] = r.value
preferred: Dict[Items, Recipe] = {}
for key, value in request.args.items():
if not key.startswith("recipe_for_"):
continue
if not value:
# Empty value means no override for this item
continue
slug = key[len("recipe_for_") :]
item_enum = slug_to_item.get(slug)
if not item_enum:
continue
# Validate that the provided recipe name is a candidate for this item
candidates = []
for r in Recipes:
rec = r.value
if item_enum in rec.outputs:
candidates.append(rec.name)
if value in candidates:
preferred[item_enum] = recipe_name_to_obj[value]
# Build chain using the object-oriented builder
# Now organize the UI data by production (recipe/building instance) rather than by item
chain_ui: List[dict] = []
# Additionally, build a separate table for alternate recipes organized by item
alternates_by_item_ui: List[dict] = []
if not errors and targets:
# Pass preferred recipes so alternates can be selected
prod_chain = ProductionChain(preferred_recipes=preferred)
item_to_production = prod_chain.build_chain(targets)
def _it_name(i: Items) -> str:
return i.value.name
# Collect unique Production objects (they may appear under multiple output items)
seen_ids: set[int] = set()
productions: List[Production] = []
for plist in item_to_production.values():
for p in plist:
pid = id(p)
if pid in seen_ids:
continue
seen_ids.add(pid)
productions.append(p)
# Sort productions by MachineOrder (then by recipe name for stability)
order_index_map = {m: i for i, m in enumerate(MachineOrder)}
def _prod_sort_key(pr: Production):
b = pr.recipe.building
# Fallback to the end if a building isn't listed in MachineOrder
order_idx = order_index_map.get(b, len(MachineOrder))
return (order_idx, pr.recipe.name)
productions.sort(key=_prod_sort_key)
chain_ui = []
used_slugs: set[str] = set()
for p in productions:
# Choose an anchor output item to bind alternate recipe selection.
# Prefer items that actually egress (flow to another production or final output),
# falling back to any output if there is no egress.
egress_items = list(p.egress.keys())
output_items = list(p.outputs.keys())
anchor_item: Optional[Items] = None
pick_from: List[Items] = egress_items if egress_items else output_items
if pick_from:
anchor_item = sorted(pick_from, key=lambda i: i.value.name)[0]
# Build candidate recipes for the anchor item
candidates: List[dict] = []
selected_recipe_name = p.recipe.name
recipe_slug = _slugify(anchor_item.value.name) if anchor_item else ""
if anchor_item is not None:
for r in Recipes:
rec = r.value
if anchor_item in rec.outputs:
candidates.append({
"name": rec.name,
"building": rec.building.value.name,
})
candidates.sort(key=lambda x: (x["name"]))
# Only show the selector once per anchor item to avoid duplicate keys in the form
if recipe_slug in used_slugs:
# clear candidates and slug so template renders plain text
candidates = []
recipe_slug = ""
else:
used_slugs.add(recipe_slug)
# Build ingress/egress presentation lists
ingress_ui: List[dict] = []
for it, links in p.ingress.items():
for lk in links:
if lk.raw or lk.production is None:
via = "Raw"
else:
via = f"{lk.production.recipe.name} ({lk.production.recipe.building.value.name})"
ingress_ui.append({
"item": _it_name(it),
"rate": lk.quantity,
"via": via,
})
egress_ui: List[dict] = []
for it, links in p.egress.items():
for lk in links:
if lk.production is None:
to = "Final Output"
else:
to = f"{lk.production.recipe.name} ({lk.production.recipe.building.value.name})"
egress_ui.append({
"item": _it_name(it),
"rate": lk.quantity,
"to": to,
})
chain_ui.append(
{
"recipe": p.recipe.name,
"building": p.recipe.building.value.name,
"production_level": p.production_level,
"inputs": [
{"item": _it_name(i), "rate": q} for i, q in p.inputs.items()
],
"outputs": [
{"item": _it_name(i), "rate": q} for i, q in p.outputs.items()
],
# Link visualization
"ingress": ingress_ui,
"egress": egress_ui,
# Deprecated in UI: per-production alternate selector (kept for reference)
"recipe_options": candidates,
"recipe_selected": selected_recipe_name,
"recipe_slug": recipe_slug,
}
)
# Build the alternate-recipe table data organized by item.
# Consider items that appear as outputs in the current chain.
items_in_chain: set[Items] = set()
for p in productions:
for it in p.outputs.keys():
items_in_chain.add(it)
# Helper: map current chain's selected recipe for an item (if any)
current_selected_by_item: Dict[Items, str] = {}
for p in productions:
for it in p.outputs.keys():
# if multiple productions output same item, first one wins (they should be consistent)
current_selected_by_item.setdefault(it, p.recipe.name)
# Build candidate lists and choose selected value
for item in sorted(items_in_chain, key=lambda i: i.value.name):
candidates: List[dict] = []
for r in Recipes:
rec = r.value
if item in rec.outputs:
candidates.append({
"name": rec.name,
"building": rec.building.value.name,
})
# Only include items that actually have alternates (2 or more options)
if len(candidates) <= 1:
continue
candidates.sort(key=lambda x: x["name"])
# Selected recipe prioritizes explicit override if provided; otherwise use what chain used
selected_name = None
if item in preferred:
selected_name = preferred[item].name
else:
selected_name = current_selected_by_item.get(item)
alternates_by_item_ui.append({
"item": item.value.name,
"slug": _slugify(item.value.name),
"options": candidates,
"selected": selected_name,
})
return render_template(
"new.html",
items=item_names,
targets_ui=targets_ui,
error=error,
chain=chain_ui,
reset_query=reset_query,
alternates_by_item=alternates_by_item_ui,
)
def create_app():
return app
if __name__ == "__main__":
# For local dev: python main.py
# app.run(host="0.0.0.0", port=5000, debug=True)
prod_chain = ProductionChain()
# prod_chain.compute_chain({Items.FusedModularFrame: 1.5})
prod_chain.build_chain({Items.IronIngot: 1.5})
app.run(host="0.0.0.0", port=5000, debug=True)
# prod_chain = ProductionChain()
# chain = prod_chain.build_chain({Items.BrassBeltDrive: 10.0})
# print(chain)
# prod_chain.compute_chain({Items.FusedModularFrame: 1.5})