hitmare-patch-1 #2

Merged
Hitmare merged 7 commits from hitmare-patch-1 into main 2023-12-23 15:10:34 +00:00
1 changed files with 107 additions and 101 deletions
Showing only changes of commit cc119297ba - Show all commits

View File

@ -1,101 +1,107 @@
import torch.nn import torch.nn
import ldm.modules.diffusionmodules.openaimodel import time
from modules import script_callbacks, shared, devices
import time
from modules import script_callbacks, shared, devices unet_options = []
unet_options = [] current_unet_option = None
current_unet_option = None current_unet = None
current_unet = None original_forward = None # not used, only left temporarily for compatibility
def list_unets():
def list_unets(): new_unets = script_callbacks.list_unets_callback()
new_unets = script_callbacks.list_unets_callback()
unet_options.clear()
unet_options.clear() unet_options.extend(new_unets)
unet_options.extend(new_unets)
def get_unet_option(option=None):
def get_unet_option(option=None): option = option or shared.opts.sd_unet
option = option or shared.opts.sd_unet
if option == "None":
if option == "None": return None
return None
if option == "Automatic":
if option == "Automatic": name = shared.sd_model.sd_checkpoint_info.model_name
name = shared.sd_model.sd_checkpoint_info.model_name
options = [x for x in unet_options if x.model_name == name]
options = [x for x in unet_options if x.model_name == name]
option = options[0].label if options else "None"
option = options[0].label if options else "None"
return next(iter([x for x in unet_options if x.label == option]), None)
return next(iter([x for x in unet_options if x.label == option]), None)
def apply_unet(option=None):
def apply_unet(option=None): global current_unet_option
global current_unet_option global current_unet
global current_unet
new_option = get_unet_option(option)
new_option = get_unet_option(option) if new_option == current_unet_option:
if new_option == current_unet_option: return
return
if current_unet is not None:
if current_unet is not None: print(f"Dectivating unet: {current_unet.option.label}")
print(f"Dectivating unet: {current_unet.option.label}") current_unet.deactivate()
current_unet.deactivate()
current_unet_option = new_option
current_unet_option = new_option if current_unet_option is None:
if current_unet_option is None: current_unet = None
current_unet = None
if not shared.sd_model.lowvram:
if not shared.sd_model.lowvram: shared.sd_model.model.diffusion_model.to(devices.device)
shared.sd_model.model.diffusion_model.to(devices.device)
return
return
shared.sd_model.model.diffusion_model.to(devices.cpu)
shared.sd_model.model.diffusion_model.to(devices.cpu) devices.torch_gc()
devices.torch_gc()
current_unet = current_unet_option.create_unet()
current_unet = current_unet_option.create_unet() current_unet.option = current_unet_option
current_unet.option = current_unet_option print(f"Activating unet: {current_unet.option.label}")
print(f"Activating unet: {current_unet.option.label}") current_unet.activate()
current_unet.activate()
class SdUnetOption:
class SdUnetOption: model_name = None
model_name = None """name of related checkpoint - this option will be selected automatically for unet if the name of checkpoint matches this"""
"""name of related checkpoint - this option will be selected automatically for unet if the name of checkpoint matches this"""
label = None
label = None """name of the unet in UI"""
"""name of the unet in UI"""
def create_unet(self):
def create_unet(self): """returns SdUnet object to be used as a Unet instead of built-in unet when making pictures"""
"""returns SdUnet object to be used as a Unet instead of built-in unet when making pictures""" raise NotImplementedError()
raise NotImplementedError()
class SdUnet(torch.nn.Module):
class SdUnet(torch.nn.Module): def forward(self, x, timesteps, context, *args, **kwargs):
def forward(self, x, timesteps, context, *args, **kwargs): raise NotImplementedError()
raise NotImplementedError()
def activate(self):
def activate(self): pass
pass
def deactivate(self):
def deactivate(self): pass
pass
def create_unet_forward(original_forward):
def UNetModel_forward(self, x, timesteps=None, context=None, *args, **kwargs): def UNetModel_forward(self, x, timesteps=None, context=None, *args, **kwargs):
try: if current_unet is not None:
if current_unet is not None and shared.current_prompt != shared.skip_unet_prompt: return current_unet.forward(x, timesteps, context, *args, **kwargs)
if '[TRT]' in shared.opts.sd_unet and '<lora:' in shared.current_prompt: try:
raise Exception('LoRA unsupported in TRT UNet') if current_unet is not None and shared.current_prompt != shared.skip_unet_prompt:
f = current_unet.forward(x, timesteps, context, *args, **kwargs) if '[TRT]' in shared.opts.sd_unet and '<lora:' in shared.current_prompt:
return f raise Exception('LoRA unsupported in TRT UNet')
except Exception as e: f = current_unet.forward(x, timesteps, context, *args, **kwargs)
start = time.time() return f
print('[UNet] Skipping TRT UNet for this request:', e, '-', shared.current_prompt) except Exception as e:
shared.sd_model.model.diffusion_model.to(devices.device) start = time.time()
shared.skip_unet_prompt = shared.current_prompt print('[UNet] Skipping TRT UNet for this request:', e, '-', shared.current_prompt)
print('[UNet] Used', time.time() - start, 'seconds') shared.sd_model.model.diffusion_model.to(devices.device)
shared.skip_unet_prompt = shared.current_prompt
return ldm.modules.diffusionmodules.openaimodel.copy_of_UNetModel_forward_for_webui(self, x, timesteps, context, *args, **kwargs) print('[UNet] Used', time.time() - start, 'seconds')
return original_forward(self, x, timesteps, context, *args, **kwargs)
return UNetModel_forward