From 6422e86ceeb0a1e210502d8ef0dfbc582378d187 Mon Sep 17 00:00:00 2001 From: Nurgelrot Date: Tue, 3 Mar 2026 09:11:22 -0700 Subject: [PATCH 1/2] Update DuetWebAPI.py --- drivers/DuetWebAPI.py | 150 +++++++++++++++++++++++------------------- 1 file changed, 83 insertions(+), 67 deletions(-) diff --git a/drivers/DuetWebAPI.py b/drivers/DuetWebAPI.py index 16d55d5..20cade6 100644 --- a/drivers/DuetWebAPI.py +++ b/drivers/DuetWebAPI.py @@ -128,141 +128,157 @@ class printerAPI: # - UnknownController: if fails to connect def __init__(self, baseURL, nickname='Default', password='reprap'): _logger.debug('Starting DuetWebAPI..') - # parse input parameters - # convert hostname into IP - # fetch IP address + # Convert hostname into IP u = urlparse(baseURL) hostIP = socket.gethostbyname(u.hostname) baseURL = u.scheme + '://' + hostIP if(u.port is not None): baseURL += ':' + str(u.port) - # set base parameters + # Set base parameters self._base_url = baseURL self._password = password self._nickname = nickname self._tools = [] - # Name as defined in RRF config.g file self._name = 'My Duet' - # set up session parameters + + # Set up session parameters self.session = requests.Session() self.retry = Retry(connect=3, backoff_factor=0.4) self.adapter = HTTPAdapter(max_retries=self.retry) self.session.mount('http://', self.adapter) + try: - # check if its a Duet 2 board - # Set up session using password - if(self._password != "reprap"): - _logger.debug('Starting DuetWebAPI session..') - URL=(f'{self._base_url}'+'/rr_connect?password=' + self._password) - r = self.session.get(URL, timeout=(self._requestTimeout,self._responseTimeout)) - - URL=(f'{self._base_url}'+'/rr_status?type=2') - r = self.session.get(URL, timeout=(self._requestTimeout,self._responseTimeout)) - if(r.ok): + # --------------------------------------------------------------- + # PATH 1: Duet 2 / RRF legacy rr_ API + # ALWAYS call rr_connect regardless of password. + # RRF 3.6+ requires this even with the default 'reprap' password, + # and returns a sessionKey that must be used for all subsequent requests. + # --------------------------------------------------------------- + _logger.debug('Attempting rr_connect..') + URL = (f'{self._base_url}' + '/rr_connect?password=' + self._password) + r = self.session.get(URL, timeout=(self._requestTimeout, self._responseTimeout)) + + if not r.ok: + # HTTP-level failure (e.g. 401 Unauthorized) - try Duet 3 SBC path + _logger.debug('rr_connect HTTP error, trying Duet 3 SBC path..') + raise DuetSBCHandler + + connect_obj = json.loads(r.text) + + # err != 0 means wrong password or board rejected connection + if connect_obj.get('err', 1) != 0: + _logger.debug('rr_connect returned err != 0, trying Duet 3 SBC path..') + raise DuetSBCHandler + + # RRF 3.6+ introduced apiLevel 2 and sessionKey requirement. + # Extract and apply sessionKey to all subsequent requests if present. + api_level = connect_obj.get('apiLevel', 1) + if api_level >= 2: + session_key = connect_obj.get('sessionKey') + if session_key is not None: + self.session.headers.update({'X-Session-Key': str(session_key)}) + _logger.debug(f'RRF apiLevel {api_level} detected - sessionKey {session_key} applied to session headers') + + # Fetch full machine status + URL = (f'{self._base_url}' + '/rr_status?type=2') + r = self.session.get(URL, timeout=(self._requestTimeout, self._responseTimeout)) + if r.ok: j = json.loads(r.text) - else: + else: raise DuetSBCHandler # Send reply to clear buffer - replyURL = (f'{self._base_url}'+'/rr_reply') - r = self.session.get(replyURL, timeout=(self._requestTimeout,self._responseTimeout)) - + replyURL = (f'{self._base_url}' + '/rr_reply') + r = self.session.get(replyURL, timeout=(self._requestTimeout, self._responseTimeout)) + # Get machine name self._name = j['name'] + # Setup tool definitions toolData = j['tools'] for inputTool in toolData: tempTool = Tool( - number = inputTool['number'], - name = inputTool['name'], - offsets={'X': inputTool['offsets'][0], 'Y': inputTool['offsets'][1], 'Z':inputTool['offsets'][2]}) + number=inputTool['number'], + name=inputTool['name'], + offsets={'X': inputTool['offsets'][0], 'Y': inputTool['offsets'][1], 'Z': inputTool['offsets'][2]}) self._tools.append(tempTool) _logger.debug('Added tool: ' + str(tempTool.getJSON())) - - # Check for firmware version + + # Check firmware version firmwareName = j['firmwareName'] - # fetch hardware board type from firmware name, character 24 boardVersion = firmwareName[24] self._firmwareVersion = j['firmwareVersion'] - # set RRF version based on results + if self._firmwareVersion[0] == "2": - # Duet running RRF v2 self._rrf2 = True self.pt = 2 - else: - # Duet 2 hardware running RRF v3 + else: self._rrf2 = False self.pt = 2 - _logger.info(' .. connected to '+ firmwareName + '- V'+ self._firmwareVersion + '..') + + _logger.info(' .. connected to ' + firmwareName + ' - V' + self._firmwareVersion + '..') return + except DuetSBCHandler as sbc: - # We're probably dealing with a Duet 3 controller, get required firmware info + # --------------------------------------------------------------- + # PATH 2: Duet 3 SBC / DSF API (/machine/ endpoints) + # --------------------------------------------------------------- try: - _logger.debug('Trying to connect to Duet 3 board..') - # Set up session using password - URL=(f'{self._base_url}'+'/machine/connect?password=' + self._password) - r = self.session.get(URL, timeout=(self._requestTimeout,self._responseTimeout)) - # Get session key + _logger.debug('Trying to connect to Duet 3 SBC board..') + URL = (f'{self._base_url}' + '/machine/connect?password=' + self._password) + r = self.session.get(URL, timeout=(self._requestTimeout, self._responseTimeout)) + r_obj = json.loads(r.text) self._sessionKey = r_obj['sessionKey'] - self.session.headers = {'X-Session-Key': self._sessionKey } - - URL=(f'{self._base_url}'+'/machine/status') - r = self.session.get(URL, timeout=(self._requestTimeout,self._responseTimeout)) + self.session.headers = {'X-Session-Key': self._sessionKey} + + URL = (f'{self._base_url}' + '/machine/status') + r = self.session.get(URL, timeout=(self._requestTimeout, self._responseTimeout)) _logger.debug('Got reply, parsing again..') j = json.loads(r.text) - _=j firmwareName = j['boards'][0]['firmwareName'] firmwareVersion = j['boards'][0]['firmwareVersion'] self.pt = 3 - # Setup tool definitions toolData = j['tools'] toolCounter = 0 for inputTool in toolData: - # attempt to read tool number from config - # redirect exception to a default toolnumber based on the current index in the array try: toolNumber = inputTool['number'] - except Exception as noNumber: + except Exception: toolNumber = str(toolCounter) - - # attempt to read tool name from config - # redirect exception to a default toolname in the format Tx (where x is the tool number) try: toolName = inputTool['name'] - except Exception as noname: - toolName = "T" + toolNumber - - # increment current tool number - toolCounter = toolCounter + 1 - + except Exception: + toolName = "T" + str(toolNumber) + toolCounter += 1 tempTool = Tool( - number = toolNumber, - name = toolName, - offsets={'X': inputTool['offsets'][0], 'Y': inputTool['offsets'][1], 'Z':inputTool['offsets'][2]}) + number=toolNumber, + name=toolName, + offsets={'X': inputTool['offsets'][0], 'Y': inputTool['offsets'][1], 'Z': inputTool['offsets'][2]}) self._tools.append(tempTool) - - _logger.debug('Duet 3 board detected') - _logger.info(' .. connected to: '+ firmwareName + '- V'+firmwareVersion + '..') + + _logger.debug('Duet 3 SBC board detected') + _logger.info(' .. connected to: ' + firmwareName + ' - V' + firmwareVersion + '..') return + except: - # The board is neither a Duet 2 controller using RRF v2/3 nor a Duet 3 controller board, return an error state raise UnknownController('Unknown controller detected.') + except UnknownController as uc: - errorMsg = 'Unknown controller at " + self._base_url + " - does not appear to be an RRF2 or RRF3 printer' + errorMsg = ('Unknown controller at ' + self._base_url + + ' - does not appear to be an RRF2 or RRF3 printer') _logger.error(errorMsg) raise SystemExit(errorMsg) + except requests.exceptions.ConnectTimeout: errorMsg = 'Connect operation: Connection timed out.' _logger.critical(errorMsg) raise Exception(errorMsg) - # except HTTPException as ht: - # _logger.error('DuetWebAPIT init: Connection error.') + except Exception as e: - # Catastrophic error. Bail. _logger.critical('DuetWebAPI2 Init: ' + str(e)) raise Exception('DuetWebAPI Init: ' + str(e)) @@ -1291,4 +1307,4 @@ def getTriggerHeight(self): _errMsg = r.reason _logger.error("Bad resposne in getTriggerHeight: " + str(r.status_code) + ' - ' + str(r.reason)) return (_errCode, _errMsg, None) - \ No newline at end of file + From 4d119ffde40f83eae4f88b6b3bf1ca6b3efb0c3a Mon Sep 17 00:00:00 2001 From: Nurgelrot Date: Wed, 4 Mar 2026 05:04:57 -0700 Subject: [PATCH 2/2] Update DuetWebAPI.py Got Standalone working- I think need a standalone tool changer to test and done have one. --- drivers/DuetWebAPI.py | 140 +++++++++++++++++++++++++++++------------- 1 file changed, 96 insertions(+), 44 deletions(-) diff --git a/drivers/DuetWebAPI.py b/drivers/DuetWebAPI.py index 20cade6..a434d03 100644 --- a/drivers/DuetWebAPI.py +++ b/drivers/DuetWebAPI.py @@ -151,10 +151,9 @@ def __init__(self, baseURL, nickname='Default', password='reprap'): try: # --------------------------------------------------------------- - # PATH 1: Duet 2 / RRF legacy rr_ API - # ALWAYS call rr_connect regardless of password. - # RRF 3.6+ requires this even with the default 'reprap' password, - # and returns a sessionKey that must be used for all subsequent requests. + # PATH 1: Duet 2 / RRF 3.5.x legacy rr_ API (apiLevel 1) + # PATH 2: Duet 3 standalone RRF 3.6+ rr_model API (apiLevel 2) + # Both start with rr_connect - apiLevel in response tells us which path to take # --------------------------------------------------------------- _logger.debug('Attempting rr_connect..') URL = (f'{self._base_url}' + '/rr_connect?password=' + self._password) @@ -172,58 +171,111 @@ def __init__(self, baseURL, nickname='Default', password='reprap'): _logger.debug('rr_connect returned err != 0, trying Duet 3 SBC path..') raise DuetSBCHandler - # RRF 3.6+ introduced apiLevel 2 and sessionKey requirement. - # Extract and apply sessionKey to all subsequent requests if present. + # apiLevel tells us which status API to use api_level = connect_obj.get('apiLevel', 1) + _logger.debug(f'rr_connect succeeded, apiLevel={api_level}') + if api_level >= 2: + # --------------------------------------------------------------- + # PATH 2: Duet 3 standalone RRF 3.6+ - uses rr_model, requires sessionKey + # --------------------------------------------------------------- session_key = connect_obj.get('sessionKey') if session_key is not None: self.session.headers.update({'X-Session-Key': str(session_key)}) - _logger.debug(f'RRF apiLevel {api_level} detected - sessionKey {session_key} applied to session headers') + _logger.debug(f'apiLevel {api_level} - sessionKey {session_key} applied') - # Fetch full machine status - URL = (f'{self._base_url}' + '/rr_status?type=2') - r = self.session.get(URL, timeout=(self._requestTimeout, self._responseTimeout)) - if r.ok: - j = json.loads(r.text) - else: - raise DuetSBCHandler + # Fetch machine name + URL = (f'{self._base_url}' + '/rr_model?key=network') + r = self.session.get(URL, timeout=(self._requestTimeout, self._responseTimeout)) + if not r.ok: + raise Exception('Failed to fetch rr_model network data') + network_obj = json.loads(r.text) + self._name = network_obj['result']['name'] + + # Fetch boards (firmware info) + URL = (f'{self._base_url}' + '/rr_model?key=boards') + r = self.session.get(URL, timeout=(self._requestTimeout, self._responseTimeout)) + if not r.ok: + raise Exception('Failed to fetch rr_model boards data') + boards_obj = json.loads(r.text) + firmwareName = boards_obj['result'][0]['firmwareName'] + self._firmwareVersion = boards_obj['result'][0]['firmwareVersion'] + + # Fetch tools + URL = (f'{self._base_url}' + '/rr_model?key=tools') + r = self.session.get(URL, timeout=(self._requestTimeout, self._responseTimeout)) + if not r.ok: + raise Exception('Failed to fetch rr_model tools data') + tools_obj = json.loads(r.text) + toolData = tools_obj['result'] + + for inputTool in toolData: + try: + toolNumber = inputTool['number'] + except Exception: + toolNumber = 0 + try: + toolName = inputTool['name'] + except Exception: + toolName = "T" + str(toolNumber) + tempTool = Tool( + number=toolNumber, + name=toolName, + offsets={'X': inputTool['offsets'][0], 'Y': inputTool['offsets'][1], 'Z': inputTool['offsets'][2]}) + self._tools.append(tempTool) + _logger.debug('Added tool: ' + str(tempTool.getJSON())) - # Send reply to clear buffer - replyURL = (f'{self._base_url}' + '/rr_reply') - r = self.session.get(replyURL, timeout=(self._requestTimeout, self._responseTimeout)) - - # Get machine name - self._name = j['name'] - - # Setup tool definitions - toolData = j['tools'] - for inputTool in toolData: - tempTool = Tool( - number=inputTool['number'], - name=inputTool['name'], - offsets={'X': inputTool['offsets'][0], 'Y': inputTool['offsets'][1], 'Z': inputTool['offsets'][2]}) - self._tools.append(tempTool) - _logger.debug('Added tool: ' + str(tempTool.getJSON())) - - # Check firmware version - firmwareName = j['firmwareName'] - boardVersion = firmwareName[24] - self._firmwareVersion = j['firmwareVersion'] - - if self._firmwareVersion[0] == "2": - self._rrf2 = True - self.pt = 2 - else: self._rrf2 = False - self.pt = 2 + self.pt = 3 # treat as pt=3 since object model structure matches Duet 3 SBC + _logger.info(' .. connected to ' + firmwareName + ' - V' + self._firmwareVersion + ' (RRF 3.6+ standalone)..') + return - _logger.info(' .. connected to ' + firmwareName + ' - V' + self._firmwareVersion + '..') - return + else: + # --------------------------------------------------------------- + # PATH 1: Duet 2 / RRF 3.5.x - uses rr_status?type=2 + # --------------------------------------------------------------- + URL = (f'{self._base_url}' + '/rr_status?type=2') + r = self.session.get(URL, timeout=(self._requestTimeout, self._responseTimeout)) + if r.ok: + j = json.loads(r.text) + else: + raise DuetSBCHandler + + # Send reply to clear buffer + replyURL = (f'{self._base_url}' + '/rr_reply') + r = self.session.get(replyURL, timeout=(self._requestTimeout, self._responseTimeout)) + + # Get machine name + self._name = j['name'] + + # Setup tool definitions + toolData = j['tools'] + for inputTool in toolData: + tempTool = Tool( + number=inputTool['number'], + name=inputTool['name'], + offsets={'X': inputTool['offsets'][0], 'Y': inputTool['offsets'][1], 'Z': inputTool['offsets'][2]}) + self._tools.append(tempTool) + _logger.debug('Added tool: ' + str(tempTool.getJSON())) + + # Check firmware version + firmwareName = j['firmwareName'] + boardVersion = firmwareName[24] + self._firmwareVersion = j['firmwareVersion'] + + if self._firmwareVersion[0] == "2": + self._rrf2 = True + self.pt = 2 + else: + self._rrf2 = False + self.pt = 2 + + _logger.info(' .. connected to ' + firmwareName + ' - V' + self._firmwareVersion + '..') + return except DuetSBCHandler as sbc: # --------------------------------------------------------------- - # PATH 2: Duet 3 SBC / DSF API (/machine/ endpoints) + # PATH 3: Duet 3 SBC / DSF API (/machine/ endpoints) # --------------------------------------------------------------- try: _logger.debug('Trying to connect to Duet 3 SBC board..')