Skip to content

Commit

Permalink
Subprocess IPC improvements (#204)
Browse files Browse the repository at this point in the history
* redesigned backend to frontend ipc

* adding docstring
  • Loading branch information
NullSenseStudio authored Oct 2, 2022
1 parent 16d03ad commit 39ed56c
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 87 deletions.
186 changes: 102 additions & 84 deletions generator_process.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
import json
from math import ceil, log
import subprocess
import sys
import os
import threading
import site
import traceback
import numpy as np
from enum import IntEnum as Lawsuit, auto

MISSING_DEPENDENCIES_ERROR = "Python dependencies are missing. Click Download Latest Release to fix."

# IPC message types from subprocess
class Action(Lawsuit): # can't help myself
UNKNOWN = -1
CLOSED = 0
"""IPC message types sent from backend to frontend"""
UNKNOWN = -1 # placeholder so you can do Action(int).name or Action(int) == Action.UNKNOWN when int is invalid
# don't add anymore negative actions
CLOSED = 0 # is not sent during normal operation, just allows for a simple way of detecting when the subprocess is closed
INFO = 1
IMAGE = 2
STEP_IMAGE = 3
Expand All @@ -23,6 +26,8 @@ class Action(Lawsuit): # can't help myself
def _missing_(cls, value):
return cls.UNKNOWN

ACTION_BYTE_LENGTH = ceil(log(max(Action)+1,256)) # doubt there will ever be more than 255 actions, but just in case

class GeneratorProcess():
def __init__(self):
self.process = subprocess.Popen([sys.executable,'generator_process.py'],cwd=os.path.dirname(os.path.realpath(__file__)),stdin=subprocess.PIPE,stdout=subprocess.PIPE)
Expand Down Expand Up @@ -60,58 +65,51 @@ def prompt2image(self, args, step_callback, image_callback, info_callback, excep
yield # nothing in queue, let blender resume
tup = queue.pop()
action = tup[0]
callbacks[action](*tup[1:])
callbacks[action](**tup[1])
if action == Action.IMAGE:
break
elif action == Action.EXCEPTION:
return

def _run(self):
reader = self.reader
def readStr():
return str(reader.read(readUInt(4)), encoding='utf-8')
def readUInt(length):
return int.from_bytes(reader.read(length),sys.byteorder,signed=False)

queue = self.queue
def queue_exception(fatal, err):
queue.append((Action.EXCEPTION, fatal, err))
def queue_exception_msg(msg):
queue.append((Action.EXCEPTION, {'fatal': True, 'msg': msg, 'trace': None}))

image_buffer = bytearray(512*512*16)
while not self.killed:
action = readUInt(1)
action = readUInt(ACTION_BYTE_LENGTH)
if action == Action.CLOSED:
if not self.killed:
queue_exception(True, "Process closed unexpectedly")
queue_exception_msg("Process closed unexpectedly")
return
elif action == Action.INFO:
queue.append((action, readStr()))
elif action == Action.IMAGE or action == Action.STEP_IMAGE:
seed = readUInt(4)
width = readUInt(4)
height = readUInt(4)
length = width*height*16
import math
w = math.floor(self.args['width']/64)*64 # stable diffusion rounds down internally
h = math.floor(self.args['height']/64)*64
if width != w or height != h:
queue_exception(True, f"Internal error, received image of wrong resolution {width}x{height}, expected {w}x{h}")
kwargs_len = readUInt(8)
kwargs = {} if kwargs_len == 0 else json.loads(reader.read(kwargs_len))
payload_len = readUInt(8)

if action in [Action.INFO, Action.STEP_NO_SHOW]:
queue.append((action, kwargs))
elif action in [Action.IMAGE, Action.STEP_IMAGE]:
expected_len = kwargs['width']*kwargs['height']*16
if payload_len != expected_len:
queue_exception_msg(f"Internal error, received image payload of {payload_len} bytes, expected {expected_len} bytes for {kwargs['width']}x{kwargs['height']} image")
return
if length > len(image_buffer):
image_buffer = bytearray(length)
m = memoryview(image_buffer)[:length]
if expected_len > len(image_buffer):
image_buffer = bytearray(expected_len)
m = memoryview(image_buffer)[:expected_len]
reader.readinto(m)
image = np.frombuffer(m,dtype=np.float32)
queue.append((action, seed, width, height, image))
elif action == Action.STEP_NO_SHOW:
queue.append((action, readUInt(4)))
kwargs['pixels'] = np.frombuffer(m,dtype=np.float32)
queue.append((action, kwargs))
elif action == Action.EXCEPTION:
fatal = readUInt(1) != 0
queue_exception(fatal, readStr())
if fatal:
queue.append((action, kwargs))
if kwargs['fatal']:
return
else:
queue_exception(True, f"Internal error, unexpected action id: {action}")
queue_exception_msg(f"Internal error, unexpected action id: {action}")
return

def main():
Expand All @@ -120,25 +118,60 @@ def main():
sys.stdout = open(os.devnull, 'w') # prevent stable diffusion logs from breaking ipc
stderr = sys.stderr

def writeUInt(length, value):
stdout.write(value.to_bytes(length,sys.byteorder,signed=False))
def send_action(action, *, payload = None, **kwargs):
"""Sends action messages to frontend.
def writeStr(string):
b = bytes(string,encoding='utf-8')
writeUInt(4,len(b))
stdout.write(b)
Arguments:
* action -- Action enum or int
* payload -- Bytes-like value that is not suitable for json
* **kwargs -- json serializable key-value pairs used for callback function arguments
"""
if Action(action) == Action.UNKNOWN:
raise ValueError(f"Internal error, invalid Action: {action}")
kwargs_len = payload_len = b'\x00'*8
if kwargs:
kwargs = bytes(json.dumps(kwargs), encoding='utf-8')
kwargs_len = len(kwargs).to_bytes(len(kwargs_len), sys.byteorder, signed=False)
if payload is not None:
payload = memoryview(payload)
payload_len = len(payload).to_bytes(len(payload_len), sys.byteorder, signed=False)
# keep all checks before writing so ipc doesn't get broken actions

def writeInfo(msg):
writeUInt(1,Action.INFO)
writeStr(msg)
stdout.flush()
def split_write(mv):
for i in range(0,len(mv),1024*64):
stdout.write(bytes(mv[i:i+1024*64])) # writing fails when using memoryview slices directly, wrap byte() first
# stdout.write(bytes(mv)) # writing full image has caused the subprocess to crash without raising any exception, safer not to use

def writeException(fatal, e):
writeUInt(1,Action.EXCEPTION)
writeUInt(1,1 if fatal else 0)
writeStr(e)
stdout.write(action.to_bytes(ACTION_BYTE_LENGTH, sys.byteorder, signed=False))
stdout.write(kwargs_len)
if kwargs:
split_write(kwargs)
stdout.write(payload_len)
if payload:
split_write(payload)
stdout.flush()

def send_info(msg):
"""Sends information to be shown to the user before generation begins."""
send_action(Action.INFO, msg=msg)

def send_exception(fatal = True, msg: str = None, trace: str = None):
"""Send exception information to frontend. When called within an except block arguments can be inferred.
Arguments:
* fatal -- whether the subprocess should be killed
* msg -- user notified prompt
* trace -- traceback string
"""
exc = sys.exc_info()
if msg is None:
msg = repr(exc[1]) if exc[1] is not None else "Internal error, see system console for details"
if trace is None and exc[2] is not None:
trace = traceback.format_exc()
if msg is None and trace is None:
raise TypeError("msg and trace cannot be None outside of an except block")
send_action(Action.EXCEPTION, fatal=fatal, msg=msg, trace=trace)

try:
from absolute_path import absolute_path
# Support Apple Silicon GPUs as much as possible.
Expand All @@ -160,13 +193,12 @@ def writeException(fatal, e):
min_files = 10 # bump this up if more files get added to .python_dependencies in source
# don't set too high so it can still pass info on individual missing modules
if not os.path.exists(".python_dependencies") or len(os.listdir()) < min_files:
e = MISSING_DEPENDENCIES_ERROR
send_exception(msg=MISSING_DEPENDENCIES_ERROR)
else:
e = repr(e)
writeException(True, e)
send_exception()
return
except Exception as e:
writeException(True, repr(e))
except:
send_exception()
return

models_config = absolute_path('stable_diffusion/configs/models.yaml')
Expand All @@ -177,34 +209,20 @@ def writeException(fatal, e):
weights = absolute_path('stable_diffusion/' + models[model].weights)

byte_to_normalized = 1.0 / 255.0
def write_pixels(image):
writeUInt(4,image.width)
writeUInt(4,image.height)
b = (np.asarray(ImageOps.flip(image).convert('RGBA'),dtype=np.float32) * byte_to_normalized).tobytes()
for i in range(0,len(b),1024*64):
stdout.write(b[i:i+1024*64])
# stdout.write(memoryview(b)[i:i+1024*64]) # won't accept memoryview for some reason, writer thinks it needs serialized but fails
# stdout.write(b) # writing full image has caused the subprocess to crash without raising any exception, safer not to use
stdout.flush()
def image_to_bytes(image):
return (np.asarray(ImageOps.flip(image).convert('RGBA'),dtype=np.float32) * byte_to_normalized).tobytes()

def image_writer(image, seed, upscaled=False):
# Only use the non-upscaled texture, as upscaling is currently unsupported by the addon.
if not upscaled:
writeUInt(1,Action.IMAGE)
writeUInt(4,seed)
write_pixels(image)
stdout.flush()
send_action(Action.IMAGE, payload=image_to_bytes(image), seed=seed, width=image.width, height=image.height)

def view_step(samples, step):
if args['show_steps']:
pixels = generator._sample_to_image(samples) # May run out of memory, keep before any writing
writeUInt(1,Action.STEP_IMAGE)
writeUInt(4,step)
write_pixels(pixels)
image = generator._sample_to_image(samples)
send_action(Action.STEP_IMAGE, payload=image_to_bytes(image), step=step, width=image.width, height=image.height)
else:
writeUInt(1,Action.STEP_NO_SHOW)
writeUInt(4,step)
stdout.flush()
send_action(Action.STEP_NO_SHOW, step=step)

def preload_models():
tqdm = None
Expand All @@ -222,7 +240,7 @@ def preload_models():
def start_preloading(model_name):
nonlocal current_model_name
current_model_name = model_name
writeInfo(f"Downloading {model_name} (0%)")
send_info(f"Downloading {model_name} (0%)")

def update_decorator(original):
def update(self, n=1):
Expand All @@ -231,7 +249,7 @@ def update(self, n=1):
frac = self.n / self.total
percentage = int(frac * 100)
if self.n - self.last_print_n >= self.miniters:
writeInfo(f"Downloading {current_model_name} ({percentage}%)")
send_info(f"Downloading {current_model_name} ({percentage}%)")
return result
return update
old_update = tqdm.update
Expand All @@ -244,7 +262,7 @@ def update(self, n=1):
start_preloading("BERT tokenizer")
transformers.BertTokenizerFast.from_pretrained('bert-base-uncased')

writeInfo("Preloading `kornia` requirements")
send_info("Preloading `kornia` requirements")
with warnings.catch_warnings():
warnings.filterwarnings('ignore', category=DeprecationWarning)
import kornia
Expand All @@ -269,7 +287,7 @@ def update(self, n=1):
args = json.loads(stdin.read(json_len))

if generator is None or (generator.full_precision != args['full_precision'] and sys.platform != 'darwin'):
writeInfo("Loading Model")
send_info("Loading Model")
try:
generator = Generate(
conf=models_config,
Expand All @@ -280,10 +298,10 @@ def update(self, n=1):
full_precision=args['full_precision']
)
generator.load_model()
except Exception as e:
writeException(True, repr(e))
except:
send_exception()
return
writeInfo("Starting")
send_info("Starting")

try:
tmp_stderr = sys.stderr = StringIO() # prompt2image writes exceptions straight to stderr, intercepting
Expand All @@ -303,14 +321,14 @@ def update(self, n=1):
import re
low_ram = re.search(r"(Not enough memory, use lower resolution)( \(max approx. \d+x\d+\))",s,re.IGNORECASE)
if low_ram:
writeException(False, f"{low_ram[1]}{' or disable full precision' if args['full_precision'] else ''}{low_ram[2]}")
send_exception(False, f"{low_ram[1]}{' or disable full precision' if args['full_precision'] else ''}{low_ram[2]}", s)
elif s.find("CUDA out of memory. Tried to allocate") != -1:
writeException(False, f"Not enough memory, use lower resolution{' or disable full precision' if args['full_precision'] else ''}")
send_exception(False, f"Not enough memory, use lower resolution{' or disable full precision' if args['full_precision'] else ''}", s)
else:
writeException(True, s) # consider all unknown exceptions to be fatal so the generator process is fully restarted next time
send_exception(True, msg=None, trace=s) # consider all unknown exceptions to be fatal so the generator process is fully restarted next time
return
except Exception as e:
writeException(True, repr(e))
except:
send_exception()
return
finally:
sys.stderr = stderr
Expand Down
9 changes: 6 additions & 3 deletions operators/dream_texture.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from importlib.resources import path
import sys
import bpy
import os
import math
Expand Down Expand Up @@ -64,12 +65,14 @@ def execute(self, context):
def info(msg=""):
scene.dream_textures_info = msg

def handle_exception(fatal, err):
def handle_exception(fatal, msg, trace):
info() # clear variable
if fatal:
kill_generator()
self.report({'ERROR'},err)
if err == MISSING_DEPENDENCIES_ERROR:
self.report({'ERROR'},msg)
if trace:
print(trace, file=sys.stderr)
if msg == MISSING_DEPENDENCIES_ERROR:
from .open_latest_version import do_force_show_download
do_force_show_download()

Expand Down

0 comments on commit 39ed56c

Please sign in to comment.