update
This commit is contained in:
@@ -1,9 +1,9 @@
|
||||
import gc
|
||||
from typing import List, Dict
|
||||
|
||||
import torch
|
||||
from loguru import logger
|
||||
|
||||
from lama_cleaner.const import DEFAULT_SD_CONTROLNET_METHOD
|
||||
from lama_cleaner.download import scan_models
|
||||
from lama_cleaner.helper import switch_mps_device
|
||||
from lama_cleaner.model import models, ControlNet, SD, SDXL
|
||||
@@ -18,6 +18,11 @@ class ModelManager:
|
||||
self.kwargs = kwargs
|
||||
self.available_models: Dict[str, ModelInfo] = {}
|
||||
self.scan_models()
|
||||
|
||||
self.sd_controlnet = kwargs.get("sd_controlnet", False)
|
||||
self.sd_controlnet_method = kwargs.get(
|
||||
"sd_controlnet_method", DEFAULT_SD_CONTROLNET_METHOD
|
||||
)
|
||||
self.model = self.init_model(name, device, **kwargs)
|
||||
|
||||
def init_model(self, name: str, device, **kwargs):
|
||||
@@ -28,12 +33,17 @@ class ModelManager:
|
||||
raise NotImplementedError(f"Unsupported model: {name}")
|
||||
|
||||
model_info = self.available_models[name]
|
||||
kwargs = {**kwargs, "model_info": model_info}
|
||||
sd_controlnet_enabled = kwargs.get("sd_controlnet", False)
|
||||
kwargs = {
|
||||
**kwargs,
|
||||
"model_info": model_info,
|
||||
"sd_controlnet": self.sd_controlnet,
|
||||
"sd_controlnet_method": self.sd_controlnet_method,
|
||||
}
|
||||
|
||||
if model_info.model_type in [ModelType.INPAINT, ModelType.DIFFUSERS_OTHER]:
|
||||
return models[name](device, **kwargs)
|
||||
|
||||
if sd_controlnet_enabled:
|
||||
if self.sd_controlnet:
|
||||
return ControlNet(device, **kwargs)
|
||||
else:
|
||||
if model_info.model_type in [
|
||||
@@ -51,7 +61,7 @@ class ModelManager:
|
||||
raise NotImplementedError(f"Unsupported model: {name}")
|
||||
|
||||
def __call__(self, image, mask, config: Config):
|
||||
self.switch_controlnet_method(control_method=config.controlnet_method)
|
||||
self.switch_controlnet_method(config)
|
||||
self.enable_disable_freeu(config)
|
||||
self.enable_disable_lcm_lora(config)
|
||||
return self.model(image, mask, config)
|
||||
@@ -66,40 +76,56 @@ class ModelManager:
|
||||
return
|
||||
|
||||
old_name = self.name
|
||||
old_sd_controlnet_method = self.sd_controlnet_method
|
||||
self.name = new_name
|
||||
|
||||
if (
|
||||
self.available_models[new_name].support_controlnet
|
||||
and self.sd_controlnet_method
|
||||
not in self.available_models[new_name].controlnets
|
||||
):
|
||||
self.sd_controlnet_method = self.available_models[new_name].controlnets[0]
|
||||
try:
|
||||
if torch.cuda.memory_allocated() > 0:
|
||||
# Clear current loaded model from memory
|
||||
torch.cuda.empty_cache()
|
||||
del self.model
|
||||
gc.collect()
|
||||
del self.model
|
||||
torch_gc()
|
||||
|
||||
self.model = self.init_model(
|
||||
new_name, switch_mps_device(new_name, self.device), **self.kwargs
|
||||
)
|
||||
except Exception as e:
|
||||
self.name = old_name
|
||||
self.sd_controlnet_method = old_sd_controlnet_method
|
||||
logger.info(f"Switch model from {old_name} to {new_name} failed, rollback")
|
||||
self.model = self.init_model(
|
||||
old_name, switch_mps_device(old_name, self.device), **self.kwargs
|
||||
)
|
||||
raise e
|
||||
|
||||
def switch_controlnet_method(self, control_method: str):
|
||||
if not self.kwargs.get("sd_controlnet"):
|
||||
return
|
||||
if self.kwargs["sd_controlnet_method"] == control_method:
|
||||
return
|
||||
|
||||
def switch_controlnet_method(self, config):
|
||||
if not self.available_models[self.name].support_controlnet:
|
||||
return
|
||||
|
||||
del self.model
|
||||
torch_gc()
|
||||
if self.sd_controlnet != config.controlnet_enabled or (
|
||||
self.sd_controlnet and self.sd_controlnet_method != config.controlnet_method
|
||||
):
|
||||
# 可能关闭/开启 controlnet
|
||||
# 可能开启了 controlnet,切换 controlnet 的方法
|
||||
old_sd_controlnet = self.sd_controlnet
|
||||
old_sd_controlnet_method = self.sd_controlnet_method
|
||||
self.sd_controlnet = config.controlnet_enabled
|
||||
self.sd_controlnet_method = config.controlnet_method
|
||||
|
||||
old_method = self.kwargs["sd_controlnet_method"]
|
||||
self.kwargs["sd_controlnet_method"] = control_method
|
||||
self.model = self.init_model(
|
||||
self.name, switch_mps_device(self.name, self.device), **self.kwargs
|
||||
)
|
||||
logger.info(f"Switch ControlNet method from {old_method} to {control_method}")
|
||||
self.model = self.init_model(
|
||||
self.name, switch_mps_device(self.name, self.device), **self.kwargs
|
||||
)
|
||||
if not config.controlnet_enabled:
|
||||
logger.info(f"Disable controlnet")
|
||||
elif old_sd_controlnet_method != config.controlnet_method:
|
||||
logger.info(
|
||||
f"Switch Controlnet method from {old_sd_controlnet_method} to {config.controlnet_method}"
|
||||
)
|
||||
else:
|
||||
logger.info(f"Enable controlnet: {config.controlnet_method}")
|
||||
|
||||
def enable_disable_freeu(self, config: Config):
|
||||
if str(self.model.device) == "mps":
|
||||
@@ -120,7 +146,7 @@ class ModelManager:
|
||||
def enable_disable_lcm_lora(self, config: Config):
|
||||
if self.available_models[self.name].support_lcm_lora:
|
||||
if config.sd_lcm_lora:
|
||||
if not self.model.model.pipe.get_list_adapters():
|
||||
if not self.model.model.get_list_adapters():
|
||||
self.model.model.load_lora_weights(self.model.lcm_lora_id)
|
||||
else:
|
||||
self.model.model.disable_lora()
|
||||
|
||||
Reference in New Issue
Block a user