Browse Source

ini from SERVER0009

Your Name 1 year ago
100 changed files with 1405 additions and 0 deletions
  1. BIN
  2. BIN
  3. BIN
  4. BIN
  5. BIN
  6. BIN
  7. BIN
  8. BIN
  9. BIN
  10. BIN
  11. BIN
  12. BIN
  13. BIN
  14. BIN
  15. BIN
  16. BIN
  17. BIN
  18. BIN
  19. BIN
  20. BIN
  21. BIN
  22. BIN
  23. BIN
  24. BIN
  25. BIN
  26. BIN
  27. BIN
  28. BIN
  29. BIN
  30. BIN
  31. BIN
  32. BIN
  33. BIN
  34. BIN
  35. BIN
  36. BIN
  37. BIN
  38. BIN
  39. BIN
  40. BIN
  41. BIN
  42. BIN
  43. BIN
  44. BIN
  45. BIN
  46. BIN
  47. BIN
  48. BIN
  49. BIN
  50. BIN
  51. BIN
  52. BIN
  53. BIN
  54. BIN
  55. BIN
  56. BIN
  57. BIN
  58. BIN
  59. BIN
  60. BIN
  61. BIN
  62. BIN
  63. BIN
  64. BIN
  65. BIN
  66. BIN
  67. BIN
  68. BIN
  69. BIN
  70. BIN
  71. BIN
  72. BIN
  73. BIN
  74. BIN
  75. BIN
  76. BIN
  77. BIN
  78. BIN
  79. BIN
  80. BIN
  81. BIN
  82. BIN
  83. BIN
  84. BIN
  85. BIN
  86. BIN
  87. BIN
  88. BIN
  89. BIN
  90. BIN
  91. BIN
  92. BIN
  93. BIN
  94. BIN
  95. BIN
  96. 742 0
  97. 312 0
  98. 120 0
  99. 119 0
  100. 112 0
































































































+ 742 - 0

@@ -0,0 +1,742 @@
+import base64
+import io
+import os
+import time
+import datetime
+import uvicorn
+import gradio as gr
+from threading import Lock
+from io import BytesIO
+from fastapi import APIRouter, Depends, FastAPI, Request, Response
+from import HTTPBasic, HTTPBasicCredentials
+from fastapi.exceptions import HTTPException
+from fastapi.responses import JSONResponse
+from fastapi.encoders import jsonable_encoder
+from secrets import compare_digest
+import modules.shared as shared
+from modules import sd_samplers, deepbooru, sd_hijack, images, scripts, ui, postprocessing, errors, restart
+from modules.api import models
+from modules.shared import opts
+from modules.processing import StableDiffusionProcessingTxt2Img, StableDiffusionProcessingImg2Img, process_images
+from modules.textual_inversion.textual_inversion import create_embedding, train_embedding
+from modules.textual_inversion.preprocess import preprocess
+from modules.hypernetworks.hypernetwork import create_hypernetwork, train_hypernetwork
+from PIL import PngImagePlugin,Image
+from modules.sd_models import checkpoints_list, unload_model_weights, reload_model_weights, checkpoint_aliases
+from modules.sd_vae import vae_dict
+from modules.sd_models_config import find_checkpoint_config_near_filename
+from modules.realesrgan_model import get_realesrgan_models
+from modules import devices
+from typing import Dict, List, Any
+import piexif
+import piexif.helper
+from contextlib import closing
+def script_name_to_index(name, scripts):
+    try:
+        return [script.title().lower() for script in scripts].index(name.lower())
+    except Exception as e:
+        raise HTTPException(status_code=422, detail=f"Script '{name}' not found") from e
+def validate_sampler_name(name):
+    config = sd_samplers.all_samplers_map.get(name, None)
+    if config is None:
+        raise HTTPException(status_code=404, detail="Sampler not found")
+    return name
+def setUpscalers(req: dict):
+    reqDict = vars(req)
+    reqDict['extras_upscaler_1'] = reqDict.pop('upscaler_1', None)
+    reqDict['extras_upscaler_2'] = reqDict.pop('upscaler_2', None)
+    return reqDict
+def decode_base64_to_image(encoding):
+    if encoding.startswith("data:image/"):
+        encoding = encoding.split(";")[1].split(",")[1]
+    try:
+        image =
+        return image
+    except Exception as e:
+        raise HTTPException(status_code=500, detail="Invalid encoded image") from e
+def encode_pil_to_base64(image):
+    with io.BytesIO() as output_bytes:
+        if opts.samples_format.lower() == 'png':
+            use_metadata = False
+            metadata = PngImagePlugin.PngInfo()
+            for key, value in
+                if isinstance(key, str) and isinstance(value, str):
+                    metadata.add_text(key, value)
+                    use_metadata = True
+  , format="PNG", pnginfo=(metadata if use_metadata else None), quality=opts.jpeg_quality)
+        elif opts.samples_format.lower() in ("jpg", "jpeg", "webp"):
+            if image.mode == "RGBA":
+                image = image.convert("RGB")
+            parameters ='parameters', None)
+            exif_bytes = piexif.dump({
+                "Exif": { piexif.ExifIFD.UserComment: piexif.helper.UserComment.dump(parameters or "", encoding="unicode") }
+            })
+            if opts.samples_format.lower() in ("jpg", "jpeg"):
+      , format="JPEG", exif = exif_bytes, quality=opts.jpeg_quality)
+            else:
+      , format="WEBP", exif = exif_bytes, quality=opts.jpeg_quality)
+        else:
+            raise HTTPException(status_code=500, detail="Invalid image format")
+        bytes_data = output_bytes.getvalue()
+    return base64.b64encode(bytes_data)
+def api_middleware(app: FastAPI):
+    rich_available = False
+    try:
+        if os.environ.get('WEBUI_RICH_EXCEPTIONS', None) is not None:
+            import anyio  # importing just so it can be placed on silent list
+            import starlette  # importing just so it can be placed on silent list
+            from rich.console import Console
+            console = Console()
+            rich_available = True
+    except Exception:
+        pass
+    @app.middleware("http")
+    async def log_and_time(req: Request, call_next):
+        ts = time.time()
+        res: Response = await call_next(req)
+        duration = str(round(time.time() - ts, 4))
+        res.headers["X-Process-Time"] = duration
+        endpoint = req.scope.get('path', 'err')
+        if shared.cmd_opts.api_log and endpoint.startswith('/sdapi'):
+            print('API {t} {code} {prot}/{ver} {method} {endpoint} {cli} {duration}'.format(
+      "%Y-%m-%d %H:%M:%S.%f"),
+                code=res.status_code,
+                ver=req.scope.get('http_version', '0.0'),
+                cli=req.scope.get('client', ('0:0.0.0', 0))[0],
+                prot=req.scope.get('scheme', 'err'),
+                method=req.scope.get('method', 'err'),
+                endpoint=endpoint,
+                duration=duration,
+            ))
+        return res
+    def handle_exception(request: Request, e: Exception):
+        err = {
+            "error": type(e).__name__,
+            "detail": vars(e).get('detail', ''),
+            "body": vars(e).get('body', ''),
+            "errors": str(e),
+        }
+        if not isinstance(e, HTTPException):  # do not print backtrace on known httpexceptions
+            message = f"API error: {request.method}: {request.url} {err}"
+            if rich_available:
+                print(message)
+                console.print_exception(show_locals=True, max_frames=2, extra_lines=1, suppress=[anyio, starlette], word_wrap=False, width=min([console.width, 200]))
+            else:
+      , exc_info=True)
+        return JSONResponse(status_code=vars(e).get('status_code', 500), content=jsonable_encoder(err))
+    @app.middleware("http")
+    async def exception_handling(request: Request, call_next):
+        try:
+            return await call_next(request)
+        except Exception as e:
+            return handle_exception(request, e)
+    @app.exception_handler(Exception)
+    async def fastapi_exception_handler(request: Request, e: Exception):
+        return handle_exception(request, e)
+    @app.exception_handler(HTTPException)
+    async def http_exception_handler(request: Request, e: HTTPException):
+        return handle_exception(request, e)
+class Api:
+    def __init__(self, app: FastAPI, queue_lock: Lock):
+        if shared.cmd_opts.api_auth:
+            self.credentials = {}
+            for auth in shared.cmd_opts.api_auth.split(","):
+                user, password = auth.split(":")
+                self.credentials[user] = password
+        self.router = APIRouter()
+ = app
+        self.queue_lock = queue_lock
+        api_middleware(
+        self.add_api_route("/sdapi/v1/txt2img", self.text2imgapi, methods=["POST"], response_model=models.TextToImageResponse)
+        self.add_api_route("/sdapi/v1/img2img", self.img2imgapi, methods=["POST"], response_model=models.ImageToImageResponse)
+        self.add_api_route("/sdapi/v1/extra-single-image", self.extras_single_image_api, methods=["POST"], response_model=models.ExtrasSingleImageResponse)
+        self.add_api_route("/sdapi/v1/extra-batch-images", self.extras_batch_images_api, methods=["POST"], response_model=models.ExtrasBatchImagesResponse)
+        self.add_api_route("/sdapi/v1/png-info", self.pnginfoapi, methods=["POST"], response_model=models.PNGInfoResponse)
+        self.add_api_route("/sdapi/v1/progress", self.progressapi, methods=["GET"], response_model=models.ProgressResponse)
+        self.add_api_route("/sdapi/v1/interrogate", self.interrogateapi, methods=["POST"])
+        self.add_api_route("/sdapi/v1/interrupt", self.interruptapi, methods=["POST"])
+        self.add_api_route("/sdapi/v1/skip", self.skip, methods=["POST"])
+        self.add_api_route("/sdapi/v1/options", self.get_config, methods=["GET"], response_model=models.OptionsModel)
+        self.add_api_route("/sdapi/v1/options", self.set_config, methods=["POST"])
+        self.add_api_route("/sdapi/v1/cmd-flags", self.get_cmd_flags, methods=["GET"], response_model=models.FlagsModel)
+        self.add_api_route("/sdapi/v1/samplers", self.get_samplers, methods=["GET"], response_model=List[models.SamplerItem])
+        self.add_api_route("/sdapi/v1/upscalers", self.get_upscalers, methods=["GET"], response_model=List[models.UpscalerItem])
+        self.add_api_route("/sdapi/v1/latent-upscale-modes", self.get_latent_upscale_modes, methods=["GET"], response_model=List[models.LatentUpscalerModeItem])
+        self.add_api_route("/sdapi/v1/sd-models", self.get_sd_models, methods=["GET"], response_model=List[models.SDModelItem])
+        self.add_api_route("/sdapi/v1/sd-vae", self.get_sd_vaes, methods=["GET"], response_model=List[models.SDVaeItem])
+        self.add_api_route("/sdapi/v1/hypernetworks", self.get_hypernetworks, methods=["GET"], response_model=List[models.HypernetworkItem])
+        self.add_api_route("/sdapi/v1/face-restorers", self.get_face_restorers, methods=["GET"], response_model=List[models.FaceRestorerItem])
+        self.add_api_route("/sdapi/v1/realesrgan-models", self.get_realesrgan_models, methods=["GET"], response_model=List[models.RealesrganItem])
+        self.add_api_route("/sdapi/v1/prompt-styles", self.get_prompt_styles, methods=["GET"], response_model=List[models.PromptStyleItem])
+        self.add_api_route("/sdapi/v1/embeddings", self.get_embeddings, methods=["GET"], response_model=models.EmbeddingsResponse)
+        self.add_api_route("/sdapi/v1/refresh-checkpoints", self.refresh_checkpoints, methods=["POST"])
+        self.add_api_route("/sdapi/v1/create/embedding", self.create_embedding, methods=["POST"], response_model=models.CreateResponse)
+        self.add_api_route("/sdapi/v1/create/hypernetwork", self.create_hypernetwork, methods=["POST"], response_model=models.CreateResponse)
+        self.add_api_route("/sdapi/v1/preprocess", self.preprocess, methods=["POST"], response_model=models.PreprocessResponse)
+        self.add_api_route("/sdapi/v1/train/embedding", self.train_embedding, methods=["POST"], response_model=models.TrainResponse)
+        self.add_api_route("/sdapi/v1/train/hypernetwork", self.train_hypernetwork, methods=["POST"], response_model=models.TrainResponse)
+        self.add_api_route("/sdapi/v1/memory", self.get_memory, methods=["GET"], response_model=models.MemoryResponse)
+        self.add_api_route("/sdapi/v1/unload-checkpoint", self.unloadapi, methods=["POST"])
+        self.add_api_route("/sdapi/v1/reload-checkpoint", self.reloadapi, methods=["POST"])
+        self.add_api_route("/sdapi/v1/scripts", self.get_scripts_list, methods=["GET"], response_model=models.ScriptsList)
+        self.add_api_route("/sdapi/v1/script-info", self.get_script_info, methods=["GET"], response_model=List[models.ScriptInfo])
+        if shared.cmd_opts.api_server_stop:
+            self.add_api_route("/sdapi/v1/server-kill", self.kill_webui, methods=["POST"])
+            self.add_api_route("/sdapi/v1/server-restart", self.restart_webui, methods=["POST"])
+            self.add_api_route("/sdapi/v1/server-stop", self.stop_webui, methods=["POST"])
+        self.default_script_arg_txt2img = []
+        self.default_script_arg_img2img = []
+    def add_api_route(self, path: str, endpoint, **kwargs):
+        if shared.cmd_opts.api_auth:
+            return, endpoint, dependencies=[Depends(self.auth)], **kwargs)
+        return, endpoint, **kwargs)
+    def auth(self, credentials: HTTPBasicCredentials = Depends(HTTPBasic())):
+        if credentials.username in self.credentials:
+            if compare_digest(credentials.password, self.credentials[credentials.username]):
+                return True
+        raise HTTPException(status_code=401, detail="Incorrect username or password", headers={"WWW-Authenticate": "Basic"})
+    def get_selectable_script(self, script_name, script_runner):
+        if script_name is None or script_name == "":
+            return None, None
+        script_idx = script_name_to_index(script_name, script_runner.selectable_scripts)
+        script = script_runner.selectable_scripts[script_idx]
+        return script, script_idx
+    def get_scripts_list(self):
+        t2ilist = [ for script in scripts.scripts_txt2img.scripts if is not None]
+        i2ilist = [ for script in scripts.scripts_img2img.scripts if is not None]
+        return models.ScriptsList(txt2img=t2ilist, img2img=i2ilist)
+    def get_script_info(self):
+        res = []
+        for script_list in [scripts.scripts_txt2img.scripts, scripts.scripts_img2img.scripts]:
+            res += [script.api_info for script in script_list if script.api_info is not None]
+        return res
+    def get_script(self, script_name, script_runner):
+        if script_name is None or script_name == "":
+            return None, None
+        script_idx = script_name_to_index(script_name, script_runner.scripts)
+        return script_runner.scripts[script_idx]
+    def init_default_script_args(self, script_runner):
+        #find max idx from the scripts in runner and generate a none array to init script_args
+        last_arg_index = 1
+        for script in script_runner.scripts:
+            if last_arg_index < script.args_to:
+                last_arg_index = script.args_to
+        # None everywhere except position 0 to initialize script args
+        script_args = [None]*last_arg_index
+        script_args[0] = 0
+        # get default values
+        with gr.Blocks(): # will throw errors calling ui function without this
+            for script in script_runner.scripts:
+                if script.ui(script.is_img2img):
+                    ui_default_values = []
+                    for elem in script.ui(script.is_img2img):
+                        ui_default_values.append(elem.value)
+                    script_args[script.args_from:script.args_to] = ui_default_values
+        return script_args
+    def init_script_args(self, request, default_script_args, selectable_scripts, selectable_idx, script_runner):
+        script_args = default_script_args.copy()
+        # position 0 in script_arg is the idx+1 of the selectable script that is going to be run when using scripts.scripts_*
+        if selectable_scripts:
+            script_args[selectable_scripts.args_from:selectable_scripts.args_to] = request.script_args
+            script_args[0] = selectable_idx + 1
+        # Now check for always on scripts
+        if request.alwayson_scripts:
+            for alwayson_script_name in request.alwayson_scripts.keys():
+                alwayson_script = self.get_script(alwayson_script_name, script_runner)
+                if alwayson_script is None:
+                    raise HTTPException(status_code=422, detail=f"always on script {alwayson_script_name} not found")
+                # Selectable script in always on script param check
+                if alwayson_script.alwayson is False:
+                    raise HTTPException(status_code=422, detail="Cannot have a selectable script in the always on scripts params")
+                # always on script with no arg should always run so you don't really need to add them to the requests
+                if "args" in request.alwayson_scripts[alwayson_script_name]:
+                    # min between arg length in scriptrunner and arg length in the request
+                    for idx in range(0, min((alwayson_script.args_to - alwayson_script.args_from), len(request.alwayson_scripts[alwayson_script_name]["args"]))):
+                        script_args[alwayson_script.args_from + idx] = request.alwayson_scripts[alwayson_script_name]["args"][idx]
+        return script_args
+    def text2imgapi(self, txt2imgreq: models.StableDiffusionTxt2ImgProcessingAPI):
+        script_runner = scripts.scripts_txt2img
+        if not script_runner.scripts:
+            script_runner.initialize_scripts(False)
+            ui.create_ui()
+        if not self.default_script_arg_txt2img:
+            self.default_script_arg_txt2img = self.init_default_script_args(script_runner)
+        selectable_scripts, selectable_script_idx = self.get_selectable_script(txt2imgreq.script_name, script_runner)
+        populate = txt2imgreq.copy(update={  # Override __init__ params
+            "sampler_name": validate_sampler_name(txt2imgreq.sampler_name or txt2imgreq.sampler_index),
+            "do_not_save_samples": not txt2imgreq.save_images,
+            "do_not_save_grid": not txt2imgreq.save_images,
+        })
+        if populate.sampler_name:
+            populate.sampler_index = None  # prevent a warning later on
+        args = vars(populate)
+        args.pop('script_name', None)
+        args.pop('script_args', None) # will refeed them to the pipeline directly after initializing them
+        args.pop('alwayson_scripts', None)
+        script_args = self.init_script_args(txt2imgreq, self.default_script_arg_txt2img, selectable_scripts, selectable_script_idx, script_runner)
+        send_images = args.pop('send_images', True)
+        args.pop('save_images', None)
+        with self.queue_lock:
+            with closing(StableDiffusionProcessingTxt2Img(sd_model=shared.sd_model, **args)) as p:
+                p.scripts = script_runner
+                p.outpath_grids = opts.outdir_txt2img_grids
+                p.outpath_samples = opts.outdir_txt2img_samples
+                try:
+                    shared.state.begin(job="scripts_txt2img")
+                    if selectable_scripts is not None:
+                        p.script_args = script_args
+                        processed =, *p.script_args) # Need to pass args as list here
+                    else:
+                        p.script_args = tuple(script_args) # Need to pass args as tuple here
+                        processed = process_images(p)
+                finally:
+                    shared.state.end()
+        b64images = list(map(encode_pil_to_base64, processed.images)) if send_images else []
+        return models.TextToImageResponse(images=b64images, parameters=vars(txt2imgreq), info=processed.js())
+    def img2imgapi(self, img2imgreq: models.StableDiffusionImg2ImgProcessingAPI):
+        init_images = img2imgreq.init_images
+        if init_images is None:
+            raise HTTPException(status_code=404, detail="Init image not found")
+        mask = img2imgreq.mask
+        if mask:
+            mask = decode_base64_to_image(mask)
+        script_runner = scripts.scripts_img2img
+        if not script_runner.scripts:
+            script_runner.initialize_scripts(True)
+            ui.create_ui()
+        if not self.default_script_arg_img2img:
+            self.default_script_arg_img2img = self.init_default_script_args(script_runner)
+        selectable_scripts, selectable_script_idx = self.get_selectable_script(img2imgreq.script_name, script_runner)
+        populate = img2imgreq.copy(update={  # Override __init__ params
+            "sampler_name": validate_sampler_name(img2imgreq.sampler_name or img2imgreq.sampler_index),
+            "do_not_save_samples": not img2imgreq.save_images,
+            "do_not_save_grid": not img2imgreq.save_images,
+            "mask": mask,
+        })
+        if populate.sampler_name:
+            populate.sampler_index = None  # prevent a warning later on
+        args = vars(populate)
+        args.pop('include_init_images', None)  # this is meant to be done by "exclude": True in model, but it's for a reason that I cannot determine.
+        args.pop('script_name', None)
+        args.pop('script_args', None)  # will refeed them to the pipeline directly after initializing them
+        args.pop('alwayson_scripts', None)
+        script_args = self.init_script_args(img2imgreq, self.default_script_arg_img2img, selectable_scripts, selectable_script_idx, script_runner)
+        send_images = args.pop('send_images', True)
+        args.pop('save_images', None)
+        with self.queue_lock:
+            with closing(StableDiffusionProcessingImg2Img(sd_model=shared.sd_model, **args)) as p:
+                p.init_images = [decode_base64_to_image(x) for x in init_images]
+                p.scripts = script_runner
+                p.outpath_grids = opts.outdir_img2img_grids
+                p.outpath_samples = opts.outdir_img2img_samples
+                try:
+                    shared.state.begin(job="scripts_img2img")
+                    if selectable_scripts is not None:
+                        p.script_args = script_args
+                        processed =, *p.script_args) # Need to pass args as list here
+                    else:
+                        p.script_args = tuple(script_args) # Need to pass args as tuple here
+                        processed = process_images(p)
+                finally:
+                    shared.state.end()
+        b64images = list(map(encode_pil_to_base64, processed.images)) if send_images else []
+        if not img2imgreq.include_init_images:
+            img2imgreq.init_images = None
+            img2imgreq.mask = None
+        return models.ImageToImageResponse(images=b64images, parameters=vars(img2imgreq), info=processed.js())
+    def extras_single_image_api(self, req: models.ExtrasSingleImageRequest):
+        reqDict = setUpscalers(req)
+        reqDict['image'] = decode_base64_to_image(reqDict['image'])
+        with self.queue_lock:
+            result = postprocessing.run_extras(extras_mode=0, image_folder="", input_dir="", output_dir="", save_output=False, **reqDict)
+        return models.ExtrasSingleImageResponse(image=encode_pil_to_base64(result[0][0]), html_info=result[1])
+    def extras_batch_images_api(self, req: models.ExtrasBatchImagesRequest):
+        reqDict = setUpscalers(req)
+        image_list = reqDict.pop('imageList', [])
+        image_folder = [decode_base64_to_image( for x in image_list]
+        with self.queue_lock:
+            result = postprocessing.run_extras(extras_mode=1, image_folder=image_folder, image="", input_dir="", output_dir="", save_output=False, **reqDict)
+        return models.ExtrasBatchImagesResponse(images=list(map(encode_pil_to_base64, result[0])), html_info=result[1])
+    def pnginfoapi(self, req: models.PNGInfoRequest):
+        if(not req.image.strip()):
+            return models.PNGInfoResponse(info="")
+        image = decode_base64_to_image(req.image.strip())
+        if image is None:
+            return models.PNGInfoResponse(info="")
+        geninfo, items = images.read_info_from_image(image)
+        if geninfo is None:
+            geninfo = ""
+        items = {**{'parameters': geninfo}, **items}
+        return models.PNGInfoResponse(info=geninfo, items=items)
+    def progressapi(self, req: models.ProgressRequest = Depends()):
+        # copy from check_progress_call of
+        if shared.state.job_count == 0:
+            return models.ProgressResponse(progress=0, eta_relative=0, state=shared.state.dict(), textinfo=shared.state.textinfo)
+        # avoid dividing zero
+        progress = 0.01
+        if shared.state.job_count > 0:
+            progress += shared.state.job_no / shared.state.job_count
+        if shared.state.sampling_steps > 0:
+            progress += 1 / shared.state.job_count * shared.state.sampling_step / shared.state.sampling_steps
+        time_since_start = time.time() - shared.state.time_start
+        eta = (time_since_start/progress)
+        eta_relative = eta-time_since_start
+        progress = min(progress, 1)
+        shared.state.set_current_image()
+        current_image = None
+        if shared.state.current_image and not req.skip_current_image:
+            current_image = encode_pil_to_base64(shared.state.current_image)
+        return models.ProgressResponse(progress=progress, eta_relative=eta_relative, state=shared.state.dict(), current_image=current_image, textinfo=shared.state.textinfo)
+    def interrogateapi(self, interrogatereq: models.InterrogateRequest):
+        image_b64 = interrogatereq.image
+        if image_b64 is None:
+            raise HTTPException(status_code=404, detail="Image not found")
+        img = decode_base64_to_image(image_b64)
+        img = img.convert('RGB')
+        # Override object param
+        with self.queue_lock:
+            if interrogatereq.model == "clip":
+                processed = shared.interrogator.interrogate(img)
+            elif interrogatereq.model == "deepdanbooru":
+                processed = deepbooru.model.tag(img)
+            else:
+                raise HTTPException(status_code=404, detail="Model not found")
+        return models.InterrogateResponse(caption=processed)
+    def interruptapi(self):
+        shared.state.interrupt()
+        return {}
+    def unloadapi(self):
+        unload_model_weights()
+        return {}
+    def reloadapi(self):
+        reload_model_weights()
+        return {}
+    def skip(self):
+        shared.state.skip()
+    def get_config(self):
+        options = {}
+        for key in
+            metadata = shared.opts.data_labels.get(key)
+            if(metadata is not None):
+                options.update({key:, shared.opts.data_labels.get(key).default)})
+            else:
+                options.update({key:, None)})
+        return options
+    def set_config(self, req: Dict[str, Any]):
+        checkpoint_name = req.get("sd_model_checkpoint", None)
+        if checkpoint_name is not None and checkpoint_name not in checkpoint_aliases:
+            raise RuntimeError(f"model {checkpoint_name!r} not found")
+        for k, v in req.items():
+            shared.opts.set(k, v)
+        return
+    def get_cmd_flags(self):
+        return vars(shared.cmd_opts)
+    def get_samplers(self):
+        return [{"name": sampler[0], "aliases":sampler[2], "options":sampler[3]} for sampler in sd_samplers.all_samplers]
+    def get_upscalers(self):
+        return [
+            {
+                "name":,
+                "model_name": upscaler.scaler.model_name,
+                "model_path": upscaler.data_path,
+                "model_url": None,
+                "scale": upscaler.scale,
+            }
+            for upscaler in shared.sd_upscalers
+        ]
+    def get_latent_upscale_modes(self):
+        return [
+            {
+                "name": upscale_mode,
+            }
+            for upscale_mode in [*(shared.latent_upscale_modes or {})]
+        ]
+    def get_sd_models(self):
+        return [{"title": x.title, "model_name": x.model_name, "hash": x.shorthash, "sha256": x.sha256, "filename": x.filename, "config": find_checkpoint_config_near_filename(x)} for x in checkpoints_list.values()]
+    def get_sd_vaes(self):
+        return [{"model_name": x, "filename": vae_dict[x]} for x in vae_dict.keys()]
+    def get_hypernetworks(self):
+        return [{"name": name, "path": shared.hypernetworks[name]} for name in shared.hypernetworks]
+    def get_face_restorers(self):
+        return [{"name", "cmd_dir": getattr(x, "cmd_dir", None)} for x in shared.face_restorers]
+    def get_realesrgan_models(self):
+        return [{"name","path":x.data_path, "scale":x.scale} for x in get_realesrgan_models(None)]
+    def get_prompt_styles(self):
+        styleList = []
+        for k in shared.prompt_styles.styles:
+            style = shared.prompt_styles.styles[k]
+            styleList.append({"name":style[0], "prompt": style[1], "negative_prompt": style[2]})
+        return styleList
+    def get_embeddings(self):
+        db = sd_hijack.model_hijack.embedding_db
+        def convert_embedding(embedding):
+            return {
+                "step": embedding.step,
+                "sd_checkpoint": embedding.sd_checkpoint,
+                "sd_checkpoint_name": embedding.sd_checkpoint_name,
+                "shape": embedding.shape,
+                "vectors": embedding.vectors,
+            }
+        def convert_embeddings(embeddings):
+            return { convert_embedding(embedding) for embedding in embeddings.values()}
+        return {
+            "loaded": convert_embeddings(db.word_embeddings),
+            "skipped": convert_embeddings(db.skipped_embeddings),
+        }
+    def refresh_checkpoints(self):
+        with self.queue_lock:
+            shared.refresh_checkpoints()
+    def create_embedding(self, args: dict):
+        try:
+            shared.state.begin(job="create_embedding")
+            filename = create_embedding(**args) # create empty embedding
+            sd_hijack.model_hijack.embedding_db.load_textual_inversion_embeddings() # reload embeddings so new one can be immediately used
+            return models.CreateResponse(info=f"create embedding filename: {filename}")
+        except AssertionError as e:
+            return models.TrainResponse(info=f"create embedding error: {e}")
+        finally:
+            shared.state.end()
+    def create_hypernetwork(self, args: dict):
+        try:
+            shared.state.begin(job="create_hypernetwork")
+            filename = create_hypernetwork(**args) # create empty embedding
+            return models.CreateResponse(info=f"create hypernetwork filename: {filename}")
+        except AssertionError as e:
+            return models.TrainResponse(info=f"create hypernetwork error: {e}")
+        finally:
+            shared.state.end()
+    def preprocess(self, args: dict):
+        try:
+            shared.state.begin(job="preprocess")
+            preprocess(**args) # quick operation unless blip/booru interrogation is enabled
+            shared.state.end()
+            return models.PreprocessResponse(info='preprocess complete')
+        except KeyError as e:
+            return models.PreprocessResponse(info=f"preprocess error: invalid token: {e}")
+        except Exception as e:
+            return models.PreprocessResponse(info=f"preprocess error: {e}")
+        finally:
+            shared.state.end()
+    def train_embedding(self, args: dict):
+        try:
+            shared.state.begin(job="train_embedding")
+            apply_optimizations = shared.opts.training_xattention_optimizations
+            error = None
+            filename = ''
+            if not apply_optimizations:
+                sd_hijack.undo_optimizations()
+            try:
+                embedding, filename = train_embedding(**args) # can take a long time to complete
+            except Exception as e:
+                error = e
+            finally:
+                if not apply_optimizations:
+                    sd_hijack.apply_optimizations()
+            return models.TrainResponse(info=f"train embedding complete: filename: {filename} error: {error}")
+        except Exception as msg:
+            return models.TrainResponse(info=f"train embedding error: {msg}")
+        finally:
+            shared.state.end()
+    def train_hypernetwork(self, args: dict):
+        try:
+            shared.state.begin(job="train_hypernetwork")
+            shared.loaded_hypernetworks = []
+            apply_optimizations = shared.opts.training_xattention_optimizations
+            error = None
+            filename = ''
+            if not apply_optimizations:
+                sd_hijack.undo_optimizations()
+            try:
+                hypernetwork, filename = train_hypernetwork(**args)
+            except Exception as e:
+                error = e
+            finally:
+                if not apply_optimizations:
+                    sd_hijack.apply_optimizations()
+                shared.state.end()
+            return models.TrainResponse(info=f"train embedding complete: filename: {filename} error: {error}")
+        except Exception as exc:
+            return models.TrainResponse(info=f"train embedding error: {exc}")
+        finally:
+            shared.state.end()
+    def get_memory(self):
+        try:
+            import os
+            import psutil
+            process = psutil.Process(os.getpid())
+            res = process.memory_info() # only rss is cross-platform guaranteed so we dont rely on other values
+            ram_total = 100 * res.rss / process.memory_percent() # and total memory is calculated as actual value is not cross-platform safe
+            ram = { 'free': ram_total - res.rss, 'used': res.rss, 'total': ram_total }
+        except Exception as err:
+            ram = { 'error': f'{err}' }
+        try:
+            import torch
+            if torch.cuda.is_available():
+                s = torch.cuda.mem_get_info()
+                system = { 'free': s[0], 'used': s[1] - s[0], 'total': s[1] }
+                s = dict(torch.cuda.memory_stats(shared.device))
+                allocated = { 'current': s['allocated_bytes.all.current'], 'peak': s['allocated_bytes.all.peak'] }
+                reserved = { 'current': s['reserved_bytes.all.current'], 'peak': s['reserved_bytes.all.peak'] }
+                active = { 'current': s['active_bytes.all.current'], 'peak': s['active_bytes.all.peak'] }
+                inactive = { 'current': s['inactive_split_bytes.all.current'], 'peak': s['inactive_split_bytes.all.peak'] }
+                warnings = { 'retries': s['num_alloc_retries'], 'oom': s['num_ooms'] }
+                cuda = {
+                    'system': system,
+                    'active': active,
+                    'allocated': allocated,
+                    'reserved': reserved,
+                    'inactive': inactive,
+                    'events': warnings,
+                }
+            else:
+                cuda = {'error': 'unavailable'}
+        except Exception as err:
+            cuda = {'error': f'{err}'}
+        return models.MemoryResponse(ram=ram, cuda=cuda)
+    def launch(self, server_name, port, root_path):
+, host=server_name, port=port, timeout_keep_alive=shared.cmd_opts.timeout_keep_alive, root_path=root_path)
+    def kill_webui(self):
+        restart.stop_program()
+    def restart_webui(self):
+        if restart.is_restartable():
+            restart.restart_program()
+        return Response(status_code=501)
+    def stop_webui(request):
+        shared.state.server_command = "stop"
+        return Response("Stopping.")

+ 312 - 0

@@ -0,0 +1,312 @@
+import inspect
+from pydantic import BaseModel, Field, create_model
+from typing import Any, Optional
+from typing_extensions import Literal
+from inflection import underscore
+from modules.processing import StableDiffusionProcessingTxt2Img, StableDiffusionProcessingImg2Img
+from modules.shared import sd_upscalers, opts, parser
+from typing import Dict, List
+    "self",
+    "kwargs",
+    "sd_model",
+    "outpath_samples",
+    "outpath_grids",
+    "sampler_index",
+    # "do_not_save_samples",
+    # "do_not_save_grid",
+    "extra_generation_params",
+    "overlay_images",
+    "do_not_reload_embeddings",
+    "seed_enable_extras",
+    "prompt_for_display",
+    "sampler_noise_scheduler_override",
+    "ddim_discretize"
+class ModelDef(BaseModel):
+    """Assistance Class for Pydantic Dynamic Model Generation"""
+    field: str
+    field_alias: str
+    field_type: Any
+    field_value: Any
+    field_exclude: bool = False
+class PydanticModelGenerator:
+    """
+    Takes in created classes and stubs them out in a way FastAPI/Pydantic is happy about:
+    source_data is a snapshot of the default values produced by the class
+    params are the names of the actual keys required by __init__
+    """
+    def __init__(
+        self,
+        model_name: str = None,
+        class_instance = None,
+        additional_fields = None,
+    ):
+        def field_type_generator(k, v):
+            # field_type = str if not overrides.get(k) else overrides[k]["type"]
+            # print(k, v.annotation, v.default)
+            field_type = v.annotation
+            return Optional[field_type]
+        def merge_class_params(class_):
+            all_classes = list(filter(lambda x: x is not object, inspect.getmro(class_)))
+            parameters = {}
+            for classes in all_classes:
+                parameters = {**parameters, **inspect.signature(classes.__init__).parameters}
+            return parameters
+        self._model_name = model_name
+        self._class_data = merge_class_params(class_instance)
+        self._model_def = [
+            ModelDef(
+                field=underscore(k),
+                field_alias=k,
+                field_type=field_type_generator(k, v),
+                field_value=v.default
+            )
+            for (k,v) in self._class_data.items() if k not in API_NOT_ALLOWED
+        ]
+        for fields in additional_fields:
+            self._model_def.append(ModelDef(
+                field=underscore(fields["key"]),
+                field_alias=fields["key"],
+                field_type=fields["type"],
+                field_value=fields["default"],
+                field_exclude=fields["exclude"] if "exclude" in fields else False))
+    def generate_model(self):
+        """
+        Creates a pydantic BaseModel
+        from the json and overrides provided at initialization
+        """
+        fields = {
+            d.field: (d.field_type, Field(default=d.field_value, alias=d.field_alias, exclude=d.field_exclude)) for d in self._model_def
+        }
+        DynamicModel = create_model(self._model_name, **fields)
+        DynamicModel.__config__.allow_population_by_field_name = True
+        DynamicModel.__config__.allow_mutation = True
+        return DynamicModel
+StableDiffusionTxt2ImgProcessingAPI = PydanticModelGenerator(
+    "StableDiffusionProcessingTxt2Img",
+    StableDiffusionProcessingTxt2Img,
+    [
+        {"key": "sampler_index", "type": str, "default": "Euler"},
+        {"key": "script_name", "type": str, "default": None},
+        {"key": "script_args", "type": list, "default": []},
+        {"key": "send_images", "type": bool, "default": True},
+        {"key": "save_images", "type": bool, "default": False},
+        {"key": "alwayson_scripts", "type": dict, "default": {}},
+    ]
+StableDiffusionImg2ImgProcessingAPI = PydanticModelGenerator(
+    "StableDiffusionProcessingImg2Img",
+    StableDiffusionProcessingImg2Img,
+    [
+        {"key": "sampler_index", "type": str, "default": "Euler"},
+        {"key": "init_images", "type": list, "default": None},
+        {"key": "denoising_strength", "type": float, "default": 0.75},
+        {"key": "mask", "type": str, "default": None},
+        {"key": "include_init_images", "type": bool, "default": False, "exclude" : True},
+        {"key": "script_name", "type": str, "default": None},
+        {"key": "script_args", "type": list, "default": []},
+        {"key": "send_images", "type": bool, "default": True},
+        {"key": "save_images", "type": bool, "default": False},
+        {"key": "alwayson_scripts", "type": dict, "default": {}},
+    ]
+class TextToImageResponse(BaseModel):
+    images: List[str] = Field(default=None, title="Image", description="The generated image in base64 format.")
+    parameters: dict
+    info: str
+class ImageToImageResponse(BaseModel):
+    images: List[str] = Field(default=None, title="Image", description="The generated image in base64 format.")
+    parameters: dict
+    info: str
+class ExtrasBaseRequest(BaseModel):
+    resize_mode: Literal[0, 1] = Field(default=0, title="Resize Mode", description="Sets the resize mode: 0 to upscale by upscaling_resize amount, 1 to upscale up to upscaling_resize_h x upscaling_resize_w.")
+    show_extras_results: bool = Field(default=True, title="Show results", description="Should the backend return the generated image?")
+    gfpgan_visibility: float = Field(default=0, title="GFPGAN Visibility", ge=0, le=1, allow_inf_nan=False, description="Sets the visibility of GFPGAN, values should be between 0 and 1.")
+    codeformer_visibility: float = Field(default=0, title="CodeFormer Visibility", ge=0, le=1, allow_inf_nan=False, description="Sets the visibility of CodeFormer, values should be between 0 and 1.")
+    codeformer_weight: float = Field(default=0, title="CodeFormer Weight", ge=0, le=1, allow_inf_nan=False, description="Sets the weight of CodeFormer, values should be between 0 and 1.")
+    upscaling_resize: float = Field(default=2, title="Upscaling Factor", ge=1, le=8, description="By how much to upscale the image, only used when resize_mode=0.")
+    upscaling_resize_w: int = Field(default=512, title="Target Width", ge=1, description="Target width for the upscaler to hit. Only used when resize_mode=1.")
+    upscaling_resize_h: int = Field(default=512, title="Target Height", ge=1, description="Target height for the upscaler to hit. Only used when resize_mode=1.")
+    upscaling_crop: bool = Field(default=True, title="Crop to fit", description="Should the upscaler crop the image to fit in the chosen size?")
+    upscaler_1: str = Field(default="None", title="Main upscaler", description=f"The name of the main upscaler to use, it has to be one of this list: {' , '.join([ for x in sd_upscalers])}")
+    upscaler_2: str = Field(default="None", title="Secondary upscaler", description=f"The name of the secondary upscaler to use, it has to be one of this list: {' , '.join([ for x in sd_upscalers])}")
+    extras_upscaler_2_visibility: float = Field(default=0, title="Secondary upscaler visibility", ge=0, le=1, allow_inf_nan=False, description="Sets the visibility of secondary upscaler, values should be between 0 and 1.")
+    upscale_first: bool = Field(default=False, title="Upscale first", description="Should the upscaler run before restoring faces?")
+class ExtraBaseResponse(BaseModel):
+    html_info: str = Field(title="HTML info", description="A series of HTML tags containing the process info.")
+class ExtrasSingleImageRequest(ExtrasBaseRequest):
+    image: str = Field(default="", title="Image", description="Image to work on, must be a Base64 string containing the image's data.")
+class ExtrasSingleImageResponse(ExtraBaseResponse):
+    image: str = Field(default=None, title="Image", description="The generated image in base64 format.")
+class FileData(BaseModel):
+    data: str = Field(title="File data", description="Base64 representation of the file")
+    name: str = Field(title="File name")
+class ExtrasBatchImagesRequest(ExtrasBaseRequest):
+    imageList: List[FileData] = Field(title="Images", description="List of images to work on. Must be Base64 strings")
+class ExtrasBatchImagesResponse(ExtraBaseResponse):
+    images: List[str] = Field(title="Images", description="The generated images in base64 format.")
+class PNGInfoRequest(BaseModel):
+    image: str = Field(title="Image", description="The base64 encoded PNG image")
+class PNGInfoResponse(BaseModel):
+    info: str = Field(title="Image info", description="A string with the parameters used to generate the image")
+    items: dict = Field(title="Items", description="An object containing all the info the image had")
+class ProgressRequest(BaseModel):
+    skip_current_image: bool = Field(default=False, title="Skip current image", description="Skip current image serialization")
+class ProgressResponse(BaseModel):
+    progress: float = Field(title="Progress", description="The progress with a range of 0 to 1")
+    eta_relative: float = Field(title="ETA in secs")
+    state: dict = Field(title="State", description="The current state snapshot")
+    current_image: str = Field(default=None, title="Current image", description="The current image in base64 format. opts.show_progress_every_n_steps is required for this to work.")
+    textinfo: str = Field(default=None, title="Info text", description="Info text used by WebUI.")
+class InterrogateRequest(BaseModel):
+    image: str = Field(default="", title="Image", description="Image to work on, must be a Base64 string containing the image's data.")
+    model: str = Field(default="clip", title="Model", description="The interrogate model used.")
+class InterrogateResponse(BaseModel):
+    caption: str = Field(default=None, title="Caption", description="The generated caption for the image.")
+class TrainResponse(BaseModel):
+    info: str = Field(title="Train info", description="Response string from train embedding or hypernetwork task.")
+class CreateResponse(BaseModel):
+    info: str = Field(title="Create info", description="Response string from create embedding or hypernetwork task.")
+class PreprocessResponse(BaseModel):
+    info: str = Field(title="Preprocess info", description="Response string from preprocessing task.")
+fields = {}
+for key, metadata in opts.data_labels.items():
+    value =
+    optType = opts.typemap.get(type(metadata.default), type(metadata.default)) if metadata.default else Any
+    if metadata is not None:
+        fields.update({key: (Optional[optType], Field(default=metadata.default, description=metadata.label))})
+    else:
+        fields.update({key: (Optional[optType], Field())})
+OptionsModel = create_model("Options", **fields)
+flags = {}
+_options = vars(parser)['_option_string_actions']
+for key in _options:
+    if(_options[key].dest != 'help'):
+        flag = _options[key]
+        _type = str
+        if _options[key].default is not None:
+            _type = type(_options[key].default)
+        flags.update({flag.dest: (_type, Field(default=flag.default,})
+FlagsModel = create_model("Flags", **flags)
+class SamplerItem(BaseModel):
+    name: str = Field(title="Name")
+    aliases: List[str] = Field(title="Aliases")
+    options: Dict[str, str] = Field(title="Options")
+class UpscalerItem(BaseModel):
+    name: str = Field(title="Name")
+    model_name: Optional[str] = Field(title="Model Name")
+    model_path: Optional[str] = Field(title="Path")
+    model_url: Optional[str] = Field(title="URL")
+    scale: Optional[float] = Field(title="Scale")
+class LatentUpscalerModeItem(BaseModel):
+    name: str = Field(title="Name")
+class SDModelItem(BaseModel):
+    title: str = Field(title="Title")
+    model_name: str = Field(title="Model Name")
+    hash: Optional[str] = Field(title="Short hash")
+    sha256: Optional[str] = Field(title="sha256 hash")
+    filename: str = Field(title="Filename")
+    config: Optional[str] = Field(title="Config file")
+class SDVaeItem(BaseModel):
+    model_name: str = Field(title="Model Name")
+    filename: str = Field(title="Filename")
+class HypernetworkItem(BaseModel):
+    name: str = Field(title="Name")
+    path: Optional[str] = Field(title="Path")
+class FaceRestorerItem(BaseModel):
+    name: str = Field(title="Name")
+    cmd_dir: Optional[str] = Field(title="Path")
+class RealesrganItem(BaseModel):
+    name: str = Field(title="Name")
+    path: Optional[str] = Field(title="Path")
+    scale: Optional[int] = Field(title="Scale")
+class PromptStyleItem(BaseModel):
+    name: str = Field(title="Name")
+    prompt: Optional[str] = Field(title="Prompt")
+    negative_prompt: Optional[str] = Field(title="Negative Prompt")
+class EmbeddingItem(BaseModel):
+    step: Optional[int] = Field(title="Step", description="The number of steps that were used to train this embedding, if available")
+    sd_checkpoint: Optional[str] = Field(title="SD Checkpoint", description="The hash of the checkpoint this embedding was trained on, if available")
+    sd_checkpoint_name: Optional[str] = Field(title="SD Checkpoint Name", description="The name of the checkpoint this embedding was trained on, if available. Note that this is the name that was used by the trainer; for a stable identifier, use `sd_checkpoint` instead")
+    shape: int = Field(title="Shape", description="The length of each individual vector in the embedding")
+    vectors: int = Field(title="Vectors", description="The number of vectors in the embedding")
+class EmbeddingsResponse(BaseModel):
+    loaded: Dict[str, EmbeddingItem] = Field(title="Loaded", description="Embeddings loaded for the current model")
+    skipped: Dict[str, EmbeddingItem] = Field(title="Skipped", description="Embeddings skipped for the current model (likely due to architecture incompatibility)")
+class MemoryResponse(BaseModel):
+    ram: dict = Field(title="RAM", description="System memory stats")
+    cuda: dict = Field(title="CUDA", description="nVidia CUDA memory stats")
+class ScriptsList(BaseModel):
+    txt2img: list = Field(default=None, title="Txt2img", description="Titles of scripts (txt2img)")
+    img2img: list = Field(default=None, title="Img2img", description="Titles of scripts (img2img)")
+class ScriptArg(BaseModel):
+    label: str = Field(default=None, title="Label", description="Name of the argument in UI")
+    value: Optional[Any] = Field(default=None, title="Value", description="Default value of the argument")
+    minimum: Optional[Any] = Field(default=None, title="Minimum", description="Minimum allowed value for the argumentin UI")
+    maximum: Optional[Any] = Field(default=None, title="Minimum", description="Maximum allowed value for the argumentin UI")
+    step: Optional[Any] = Field(default=None, title="Minimum", description="Step for changing value of the argumentin UI")
+    choices: Optional[List[str]] = Field(default=None, title="Choices", description="Possible values for the argument")
+class ScriptInfo(BaseModel):
+    name: str = Field(default=None, title="Name", description="Script name")
+    is_alwayson: bool = Field(default=None, title="IsAlwayson", description="Flag specifying whether this script is an alwayson script")
+    is_img2img: bool = Field(default=None, title="IsImg2img", description="Flag specifying whether this script is an img2img script")
+    args: List[ScriptArg] = Field(title="Arguments", description="List of script's arguments")

+ 120 - 0

@@ -0,0 +1,120 @@
+import json
+import os.path
+import threading
+import time
+from modules.paths import data_path, script_path
+cache_filename = os.path.join(data_path, "cache.json")
+cache_data = None
+cache_lock = threading.Lock()
+dump_cache_after = None
+dump_cache_thread = None
+def dump_cache():
+    """
+    Marks cache for writing to disk. 5 seconds after no one else flags the cache for writing, it is written.
+    """
+    global dump_cache_after
+    global dump_cache_thread
+    def thread_func():
+        global dump_cache_after
+        global dump_cache_thread
+        while dump_cache_after is not None and time.time() < dump_cache_after:
+            time.sleep(1)
+        with cache_lock:
+            with open(cache_filename, "w", encoding="utf8") as file:
+                json.dump(cache_data, file, indent=4)
+            dump_cache_after = None
+            dump_cache_thread = None
+    with cache_lock:
+        dump_cache_after = time.time() + 5
+        if dump_cache_thread is None:
+            dump_cache_thread = threading.Thread(name='cache-writer', target=thread_func)
+            dump_cache_thread.start()
+def cache(subsection):
+    """
+    Retrieves or initializes a cache for a specific subsection.
+    Parameters:
+        subsection (str): The subsection identifier for the cache.
+    Returns:
+        dict: The cache data for the specified subsection.
+    """
+    global cache_data
+    if cache_data is None:
+        with cache_lock:
+            if cache_data is None:
+                if not os.path.isfile(cache_filename):
+                    cache_data = {}
+                else:
+                    try:
+                        with open(cache_filename, "r", encoding="utf8") as file:
+                            cache_data = json.load(file)
+                    except Exception:
+                        os.replace(cache_filename, os.path.join(script_path, "tmp", "cache.json"))
+                        print('[ERROR] issue occurred while trying to read cache.json, move current cache to tmp/cache.json and create new cache')
+                        cache_data = {}
+    s = cache_data.get(subsection, {})
+    cache_data[subsection] = s
+    return s
+def cached_data_for_file(subsection, title, filename, func):
+    """
+    Retrieves or generates data for a specific file, using a caching mechanism.
+    Parameters:
+        subsection (str): The subsection of the cache to use.
+        title (str): The title of the data entry in the subsection of the cache.
+        filename (str): The path to the file to be checked for modifications.
+        func (callable): A function that generates the data if it is not available in the cache.
+    Returns:
+        dict or None: The cached or generated data, or None if data generation fails.
+    The `cached_data_for_file` function implements a caching mechanism for data stored in files.
+    It checks if the data associated with the given `title` is present in the cache and compares the
+    modification time of the file with the cached modification time. If the file has been modified,
+    the cache is considered invalid and the data is regenerated using the provided `func`.
+    Otherwise, the cached data is returned.
+    If the data generation fails, None is returned to indicate the failure. Otherwise, the generated
+    or cached data is returned as a dictionary.
+    """
+    existing_cache = cache(subsection)
+    ondisk_mtime = os.path.getmtime(filename)
+    entry = existing_cache.get(title)
+    if entry:
+        cached_mtime = entry.get("mtime", 0)
+        if ondisk_mtime > cached_mtime:
+            entry = None
+    if not entry or 'value' not in entry:
+        value = func()
+        if value is None:
+            return None
+        entry = {'mtime': ondisk_mtime, 'value': value}
+        existing_cache[title] = entry
+        dump_cache()
+    return entry['value']

+ 119 - 0

@@ -0,0 +1,119 @@
+from functools import wraps
+import html
+import threading
+import time
+from modules import shared, progress, errors, devices
+queue_lock = threading.Lock()
+def wrap_queued_call(func):
+    def f(*args, **kwargs):
+        with queue_lock:
+            res = func(*args, **kwargs)
+        return res
+    return f
+def wrap_gradio_gpu_call(func, extra_outputs=None):
+    @wraps(func)
+    def f(*args, **kwargs):
+        # if the first argument is a string that says "task(...)", it is treated as a job id
+        if args and type(args[0]) == str and args[0].startswith("task(") and args[0].endswith(")"):
+            id_task = args[0]
+            progress.add_task_to_queue(id_task)
+        else:
+            id_task = None
+        with queue_lock:
+            shared.state.begin(job=id_task)
+            progress.start_task(id_task)
+            try:
+                res = func(*args, **kwargs)
+                progress.record_results(id_task, res)
+            finally:
+                progress.finish_task(id_task)
+            shared.state.end()
+        return res
+    return wrap_gradio_call(f, extra_outputs=extra_outputs, add_stats=True)
+def wrap_gradio_call(func, extra_outputs=None, add_stats=False):
+    @wraps(func)
+    def f(*args, extra_outputs_array=extra_outputs, **kwargs):
+        run_memmon = shared.opts.memmon_poll_rate > 0 and not shared.mem_mon.disabled and add_stats
+        if run_memmon:
+            shared.mem_mon.monitor()
+        t = time.perf_counter()
+        try:
+            res = list(func(*args, **kwargs))
+        except Exception as e:
+            # When printing out our debug argument list,
+            # do not print out more than a 100 KB of text
+            max_debug_str_len = 131072
+            message = "Error completing request"
+            arg_str = f"Arguments: {args} {kwargs}"[:max_debug_str_len]
+            if len(arg_str) > max_debug_str_len:
+                arg_str += f" (Argument list truncated at {max_debug_str_len}/{len(arg_str)} characters)"
+  "{message}\n{arg_str}", exc_info=True)
+            shared.state.job = ""
+            shared.state.job_count = 0
+            if extra_outputs_array is None:
+                extra_outputs_array = [None, '']
+            error_message = f'{type(e).__name__}: {e}'
+            res = extra_outputs_array + [f"<div class='error'>{html.escape(error_message)}</div>"]
+        devices.torch_gc()
+        shared.state.skipped = False
+        shared.state.interrupted = False
+        shared.state.job_count = 0
+        if not add_stats:
+            return tuple(res)
+        elapsed = time.perf_counter() - t
+        elapsed_m = int(elapsed // 60)
+        elapsed_s = elapsed % 60
+        elapsed_text = f"{elapsed_s:.1f} sec."
+        if elapsed_m > 0:
+            elapsed_text = f"{elapsed_m} min. "+elapsed_text
+        if run_memmon:
+            mem_stats = {k: -(v//-(1024*1024)) for k, v in shared.mem_mon.stop().items()}
+            active_peak = mem_stats['active_peak']
+            reserved_peak = mem_stats['reserved_peak']
+            sys_peak = mem_stats['system_peak']
+            sys_total = mem_stats['total']
+            sys_pct = sys_peak/max(sys_total, 1) * 100
+            toltip_a = "Active: peak amount of video memory used during generation (excluding cached data)"
+            toltip_r = "Reserved: total amout of video memory allocated by the Torch library "
+            toltip_sys = "System: peak amout of video memory allocated by all running programs, out of total capacity"
+            text_a = f"<abbr title='{toltip_a}'>A</abbr>: <span class='measurement'>{active_peak/1024:.2f} GB</span>"
+            text_r = f"<abbr title='{toltip_r}'>R</abbr>: <span class='measurement'>{reserved_peak/1024:.2f} GB</span>"
+            text_sys = f"<abbr title='{toltip_sys}'>Sys</abbr>: <span class='measurement'>{sys_peak/1024:.1f}/{sys_total/1024:g} GB</span> ({sys_pct:.1f}%)"
+            vram_html = f"<p class='vram'>{text_a}, <wbr>{text_r}, <wbr>{text_sys}</p>"
+        else:
+            vram_html = ''
+        # last item is always HTML
+        res[-1] += f"<div class='performance'><p class='time'>Time taken: <wbr><span class='measurement'>{elapsed_text}</span></p>{vram_html}</div>"
+        return tuple(res)
+    return f

+ 112 - 0

@@ -0,0 +1,112 @@
+import argparse
+import json
+import os
+from modules.paths_internal import models_path, script_path, data_path, extensions_dir, extensions_builtin_dir, sd_default_config, sd_model_file  # noqa: F401
+parser = argparse.ArgumentParser()
+parser.add_argument("-f", action='store_true', help=argparse.SUPPRESS)  # allows running as root; implemented outside of webui
+parser.add_argument("--update-all-extensions", action='store_true', help=" argument: download updates for all extensions when starting the program")
+parser.add_argument("--skip-python-version-check", action='store_true', help=" argument: do not check python version")
+parser.add_argument("--skip-torch-cuda-test", action='store_true', help=" argument: do not check if CUDA is able to work properly")
+parser.add_argument("--reinstall-xformers", action='store_true', help=" argument: install the appropriate version of xformers even if you have some version already installed")
+parser.add_argument("--reinstall-torch", action='store_true', help=" argument: install the appropriate version of torch even if you have some version already installed")
+parser.add_argument("--update-check", action='store_true', help=" argument: check for updates at startup")
+parser.add_argument("--test-server", action='store_true', help=" argument: configure server for testing")
+parser.add_argument("--skip-prepare-environment", action='store_true', help=" argument: skip all environment preparation")
+parser.add_argument("--skip-install", action='store_true', help=" argument: skip installation of packages")
+parser.add_argument("--do-not-download-clip", action='store_true', help="do not download CLIP model even if it's not included in the checkpoint")
+parser.add_argument("--data-dir", type=str, default=os.path.dirname(os.path.dirname(os.path.realpath(__file__))), help="base path where all user data is stored")
+parser.add_argument("--config", type=str, default=sd_default_config, help="path to config which constructs model",)
+parser.add_argument("--ckpt", type=str, default=sd_model_file, help="path to checkpoint of stable diffusion model; if specified, this checkpoint will be added to the list of checkpoints and loaded",)
+parser.add_argument("--ckpt-dir", type=str, default=None, help="Path to directory with stable diffusion checkpoints")
+parser.add_argument("--vae-dir", type=str, default=None, help="Path to directory with VAE files")
+parser.add_argument("--gfpgan-dir", type=str, help="GFPGAN directory", default=('./src/gfpgan' if os.path.exists('./src/gfpgan') else './GFPGAN'))
+parser.add_argument("--gfpgan-model", type=str, help="GFPGAN model file name", default=None)
+parser.add_argument("--no-half", action='store_true', help="do not switch the model to 16-bit floats")
+parser.add_argument("--no-half-vae", action='store_true', help="do not switch the VAE model to 16-bit floats")
+parser.add_argument("--no-progressbar-hiding", action='store_true', help="do not hide progressbar in gradio UI (we hide it because it slows down ML if you have hardware acceleration in browser)")
+parser.add_argument("--max-batch-count", type=int, default=16, help="maximum batch count value for the UI")
+parser.add_argument("--embeddings-dir", type=str, default=os.path.join(data_path, 'embeddings'), help="embeddings directory for textual inversion (default: embeddings)")
+parser.add_argument("--textual-inversion-templates-dir", type=str, default=os.path.join(script_path, 'textual_inversion_templates'), help="directory with textual inversion templates")
+parser.add_argument("--hypernetwork-dir", type=str, default=os.path.join(models_path, 'hypernetworks'), help="hypernetwork directory")
+parser.add_argument("--localizations-dir", type=str, default=os.path.join(script_path, 'localizations'), help="localizations directory")
+parser.add_argument("--allow-code", action='store_true', help="allow custom script execution from webui")
+parser.add_argument("--medvram", action='store_true', help="enable stable diffusion model optimizations for sacrificing a little speed for low VRM usage")
+parser.add_argument("--lowvram", action='store_true', help="enable stable diffusion model optimizations for sacrificing a lot of speed for very low VRM usage")
+parser.add_argument("--lowram", action='store_true', help="load stable diffusion checkpoint weights to VRAM instead of RAM")
+parser.add_argument("--always-batch-cond-uncond", action='store_true', help="disables cond/uncond batching that is enabled to save memory with --medvram or --lowvram")
+parser.add_argument("--unload-gfpgan", action='store_true', help="does not do anything.")
+parser.add_argument("--precision", type=str, help="evaluate at this precision", choices=["full", "autocast"], default="autocast")
+parser.add_argument("--upcast-sampling", action='store_true', help="upcast sampling. No effect with --no-half. Usually produces similar results to --no-half with better performance while using less memory.")
+parser.add_argument("--share", action='store_true', help="use share=True for gradio and make the UI accessible through their site")
+parser.add_argument("--ngrok", type=str, help="ngrok authtoken, alternative to gradio --share", default=None)
+parser.add_argument("--ngrok-region", type=str, help="does not do anything.", default="")
+parser.add_argument("--ngrok-options", type=json.loads, help='The options to pass to ngrok in JSON format, e.g.: \'{"authtoken_from_env":true, "basic_auth":"user:password", "oauth_provider":"google", "oauth_allow_emails":""}\'', default=dict())
+parser.add_argument("--enable-insecure-extension-access", action='store_true', help="enable extensions tab regardless of other options")
+parser.add_argument("--codeformer-models-path", type=str, help="Path to directory with codeformer model file(s).", default=os.path.join(models_path, 'Codeformer'))
+parser.add_argument("--gfpgan-models-path", type=str, help="Path to directory with GFPGAN model file(s).", default=os.path.join(models_path, 'GFPGAN'))
+parser.add_argument("--esrgan-models-path", type=str, help="Path to directory with ESRGAN model file(s).", default=os.path.join(models_path, 'ESRGAN'))
+parser.add_argument("--bsrgan-models-path", type=str, help="Path to directory with BSRGAN model file(s).", default=os.path.join(models_path, 'BSRGAN'))
+parser.add_argument("--realesrgan-models-path", type=str, help="Path to directory with RealESRGAN model file(s).", default=os.path.join(models_path, 'RealESRGAN'))
+parser.add_argument("--clip-models-path", type=str, help="Path to directory with CLIP model file(s).", default=None)
+parser.add_argument("--xformers", action='store_true', help="enable xformers for cross attention layers")
+parser.add_argument("--force-enable-xformers", action='store_true', help="enable xformers for cross attention layers regardless of whether the checking code thinks you can run it; do not make bug reports if this fails to work")
+parser.add_argument("--xformers-flash-attention", action='store_true', help="enable xformers with Flash Attention to improve reproducibility (supported for SD2.x or variant only)")
+parser.add_argument("--deepdanbooru", action='store_true', help="does not do anything")
+parser.add_argument("--opt-split-attention", action='store_true', help="prefer Doggettx's cross-attention layer optimization for automatic choice of optimization")
+parser.add_argument("--opt-sub-quad-attention", action='store_true', help="prefer memory efficient sub-quadratic cross-attention layer optimization for automatic choice of optimization")
+parser.add_argument("--sub-quad-q-chunk-size", type=int, help="query chunk size for the sub-quadratic cross-attention layer optimization to use", default=1024)
+parser.add_argument("--sub-quad-kv-chunk-size", type=int, help="kv chunk size for the sub-quadratic cross-attention layer optimization to use", default=None)
+parser.add_argument("--sub-quad-chunk-threshold", type=int, help="the percentage of VRAM threshold for the sub-quadratic cross-attention layer optimization to use chunking", default=None)
+parser.add_argument("--opt-split-attention-invokeai", action='store_true', help="prefer InvokeAI's cross-attention layer optimization for automatic choice of optimization")
+parser.add_argument("--opt-split-attention-v1", action='store_true', help="prefer older version of split attention optimization for automatic choice of optimization")
+parser.add_argument("--opt-sdp-attention", action='store_true', help="prefer scaled dot product cross-attention layer optimization for automatic choice of optimization; requires PyTorch 2.*")
+parser.add_argument("--opt-sdp-no-mem-attention", action='store_true', help="prefer scaled dot product cross-attention layer optimization without memory efficient attention for automatic choice of optimization, makes image generation deterministic; requires PyTorch 2.*")
+parser.add_argument("--disable-opt-split-attention", action='store_true', help="prefer no cross-attention layer optimization for automatic choice of optimization")
+parser.add_argument("--disable-nan-check", action='store_true', help="do not check if produced images/latent spaces have nans; useful for running without a checkpoint in CI")
+parser.add_argument("--use-cpu", nargs='+', help="use CPU as torch device for specified modules", default=[], type=str.lower)
+parser.add_argument("--listen", action='store_true', help="launch gradio with as server name, allowing to respond to network requests")
+parser.add_argument("--port", type=int, help="launch gradio with given server port, you need root/admin rights for ports < 1024, defaults to 7860 if available", default=None)
+parser.add_argument("--show-negative-prompt", action='store_true', help="does not do anything", default=False)
+parser.add_argument("--ui-config-file", type=str, help="filename to use for ui configuration", default=os.path.join(data_path, 'ui-config.json'))
+parser.add_argument("--hide-ui-dir-config", action='store_true', help="hide directory configuration from webui", default=False)
+parser.add_argument("--freeze-settings", action='store_true', help="disable editing settings", default=False)
+parser.add_argument("--ui-settings-file", type=str, help="filename to use for ui settings", default=os.path.join(data_path, 'config.json'))
+parser.add_argument("--gradio-debug",  action='store_true', help="launch gradio with --debug option")
+parser.add_argument("--gradio-auth", type=str, help='set gradio authentication like "username:password"; or comma-delimit multiple like "u1:p1,u2:p2,u3:p3"', default=None)
+parser.add_argument("--gradio-auth-path", type=str, help='set gradio authentication file path ex. "/path/to/auth/file" same auth format as --gradio-auth', default=None)
+parser.add_argument("--gradio-img2img-tool", type=str, help='does not do anything')
+parser.add_argument("--gradio-inpaint-tool", type=str, help="does not do anything")
+parser.add_argument("--gradio-allowed-path", action='append', help="add path to gradio's allowed_paths, make it possible to serve files from it")
+parser.add_argument("--opt-channelslast", action='store_true', help="change memory type for stable diffusion to channels last")
+parser.add_argument("--styles-file", type=str, help="filename to use for styles", default=os.path.join(data_path, 'styles.csv'))
+parser.add_argument("--autolaunch", action='store_true', help="open the webui URL in the system's default browser upon launch", default=False)
+parser.add_argument("--theme", type=str, help="launches the UI with light or dark theme", default=None)
+parser.add_argument("--use-textbox-seed", action='store_true', help="use textbox for seeds in UI (no up/down, but possible to input long seeds)", default=False)
+parser.add_argument("--disable-console-progressbars", action='store_true', help="do not output progressbars to console", default=False)
+parser.add_argument("--enable-console-prompts", action='store_true', help="print prompts to console when generating with txt2img and img2img", default=False)
+parser.add_argument('--vae-path', type=str, help='Checkpoint to use as VAE; setting this argument disables all settings related to VAE', default=None)
+parser.add_argument("--disable-safe-unpickle", action='store_true', help="disable checking pytorch models for malicious code", default=False)
+parser.add_argument("--api", action='store_true', help="use api=True to launch the API together with the webui (use --nowebui instead for only the API)")
+parser.add_argument("--api-auth", type=str, help='Set authentication for API like "username:password"; or comma-delimit multiple like "u1:p1,u2:p2,u3:p3"', default=None)
+parser.add_argument("--api-log", action='store_true', help="use api-log=True to enable logging of all API requests")
+parser.add_argument("--nowebui", action='store_true', help="use api=True to launch the API instead of the webui")
+parser.add_argument("--ui-debug-mode", action='store_true', help="Don't load model to quickly launch UI")
+parser.add_argument("--device-id", type=str, help="Select the default CUDA device to use (export CUDA_VISIBLE_DEVICES=0,1,etc might be needed before)", default=None)
+parser.add_argument("--administrator", action='store_true', help="Administrator rights", default=False)
+parser.add_argument("--cors-allow-origins", type=str, help="Allowed CORS origin(s) in the form of a comma-separated list (no spaces)", default=None)
+parser.add_argument("--cors-allow-origins-regex", type=str, help="Allowed CORS origin(s) in the form of a single regular expression", default=None)
+parser.add_argument("--tls-keyfile", type=str, help="Partially enables TLS, requires --tls-certfile to fully function", default=None)
+parser.add_argument("--tls-certfile", type=str, help="Partially enables TLS, requires --tls-keyfile to fully function", default=None)
+parser.add_argument("--disable-tls-verify", action="store_false", help="When passed, enables the use of self-signed certificates.", default=None)
+parser.add_argument("--server-name", type=str, help="Sets hostname of server", default=None)
+parser.add_argument("--gradio-queue", action='store_true', help="does not do anything", default=True)
+parser.add_argument("--no-gradio-queue", action='store_true', help="Disables gradio queue; causes the webpage to use http requests instead of websockets; was the defaul in earlier versions")
+parser.add_argument("--skip-version-check", action='store_true', help="Do not check versions of torch and xformers")
+parser.add_argument("--no-hashing", action='store_true', help="disable sha256 hashing of checkpoints to help loading performance", default=False)
+parser.add_argument("--no-download-sd-model", action='store_true', help="don't download SD1.5 model even if no model is found in --ckpt-dir", default=False)
+parser.add_argument('--subpath', type=str, help='customize the subpath for gradio, use with reverse proxy')
+parser.add_argument('--add-stop-route', action='store_true', help='add /_stop route to stop server')
+parser.add_argument('--api-server-stop', action='store_true', help='enable server stop/restart/kill via api')
+parser.add_argument('--timeout-keep-alive', type=int, default=30, help='set timeout_keep_alive for uvicorn')

Some files were not shown because too many files changed in this diff