Source code for APOAshDome

"""
Low level inteface to APO Ash Dome
"""

import time
import timer
import numpy as np
from threading import Timer, Lock, Thread
from logging import Logger
from tristate import Tristate
import Encoder

# put in try/except for readthedocs
try :
    import piplates.RELAYplate as RELAY
    import APOSafety
    import RPi.GPIO as GPIO
except :
    print('no APOSafety or RPi.GPIO')

# RELAYPlates relay numbers for different operations
DOME_POWER = 1      #relay 207  pin 47
UPPER_POWER = 2     #relay 205  pin 48
DOME_DIRECTION = 3  #relay 208  pin 5
UPPER_DIRECTION = 4 #relay 206  pin 4
WATCHDOG_RESET = 5  #relay 201  pin 50
LOWER_DIRECTION = 6
LOWER_POWER = 7

# GPIO bit for home sensor
HOME = 16

# time before shutters are reigster open or closed
UPPER_TIME = 86
LOWER_TIME = 5

# Park and home positions
PARK_POSITION = 60
HOME_POSITION = 80
DOME_TOLERANCE = 3

# GPIO pins for azimuth encoder
ENCODER_A = 5
ENCODER_B = 12
# scale for azimuth encoder
steps_per_degree = -724  

from enum import Enum
[docs] class ShutterState(Enum) : shutterOpen = 0 # Dome shutter status open shutterClosed = 1 # Dome shutter status closed shutterOpening = 2 # Dome shutter status opening shutterClosing = 3 # Dome shutter status closing shutterError = 4 # Dome shutter status error
[docs] class Dome() : def __init__(self, logger=None ) : """ Initialize dome properties and capabilities """ self.connected = True self.altitude = None #try: self.is_upper_open = Tristate() self.is_upper_closed = Tristate() self.is_lower_open = Tristate() self.is_lower_closed = Tristate() self.cansetaltitude = False self.cansetazimuth = True self.cansetpark = True self.cansetshutter = True self.canfindhome = True self.canslave = False self.canpark = True self.cansyncazimuth = True self.slaved = False self.shutterstatus = ShutterState.shutterError.value self.slewing = False self.park_position = PARK_POSITION self.verbose = True self.enc = Encoder.Encoder(ENCODER_A,ENCODER_B) GPIO.setup(HOME, GPIO.IN) self.SupportedActions=['weather'] try: with open("SavedPosition.txt") as fp : self.azimuth=float(fp.read()) self.enc.pos = int((self.azimuth - HOME_POSITION) * steps_per_degree) except: self.azimuth = HOME_POSITION self.logger = logger self.logger.info('Instantiating dome device') self.start_watchdog() self.start_weather()
[docs] def save_position(self) : """ Save current position to file """ self.get_azimuth() with open("SavedPosition.txt","w") as fp : fp.write('{:7.1f}'.format(self.azimuth))
[docs] def start_weather(self) : """ Start weather monitoring thread """ self.safety=APOSafety.Safety(warnonly=False,use25m=True) t=Thread(target=self.monitor_weather) t.start()
[docs] def monitor_weather(self,timeout=90) : """ Check weather periodically """ while True : if not self.safety.issafe() and \ not (self.shutterstatus == ShutterState.shutterClosed.value) and\ not (self.shutterstatus == ShutterState.shutterClosing.value) : self.close_shutter() time.sleep(timeout)
[docs] def reset_watchdog(self,timeout=110) : """ Reset watchdog periodically """ while True : self.logger.info('resetting watchdog') set_relay(WATCHDOG_RESET,1) time.sleep(5) set_relay(WATCHDOG_RESET,0) time.sleep(timeout)
[docs] def start_watchdog(self) : """ Start thread to periodically reset watchdog """ t=Thread(target=self.reset_watchdog) t.start()
[docs] def home(self) : """ Send dome to home asynchronously """ if not self.athome() : t=Thread(target=self.sendhome) t.start()
[docs] def sendhome(self,timeout=180) : """ Go to home """ self.logger.info('sending home') self.rotate(1) t=timer.Timer() t.start() while not self.athome() and t.elapsed()<timeout : time.sleep(0.1) continue if t.elapsed() < timeout : self.logger.info('Encoder position at home: {:d}'.format(self.enc.pos)) self.enc.pos = 0 self.logger.info('Setting to zero{:d}'.format(self.enc.pos)) self.azimuth = HOME_POSITION set_relay(DOME_POWER,0) self.slewing = False self.logger.info('hit home') else : self.logger.info('Home timer expired before finding home !') t.stop() self.save_position()
[docs] def athome(self) : """ Check if at home position """ if GPIO.input(HOME) : return True else : return False
[docs] def set_upper_open(self) : """ Set upper shutter status to open and turn off shutter power """ self.logger.info('setting upper shutter open') self.is_upper_open = True self.shutterstatus = ShutterState.shutterOpen.value set_relay(UPPER_POWER,0) set_relay(UPPER_DIRECTION,0)
[docs] def set_upper_closed(self) : """ Set upper shutter status to closed and turn off shutter power """ self.logger.info('setting upper shutter closed') self.is_upper_open = False self.shutterstatus = ShutterState.shutterClosed.value set_relay(UPPER_POWER,0)
[docs] def open_upper(self) : """ Open upper shutter asynchronously """ set_relay(UPPER_POWER,0) self.logger.info('starting shutter open') set_relay(UPPER_DIRECTION,0) set_relay(UPPER_POWER,1) self.shutterstatus = ShutterState.shutterOpening.value t=Timer(UPPER_TIME,self.set_upper_open) t.start()
[docs] def close_upper(self) : """ Close upper shutter """ set_relay(UPPER_POWER,0) self.logger.info('starting shutter close') set_relay(UPPER_DIRECTION,1) set_relay(UPPER_POWER,1) self.shutterstatus = ShutterState.shutterClosing.value t=Timer(UPPER_TIME,self.set_upper_closed) t.start()
[docs] def set_lower_open(self) : """ Set lower shutter status to open and turn off shutter power """ self.is_lower_open = True set_relay(LOWER_POWER,0)
[docs] def set_lower_closed(self) : """ Set lower shutter status to closed and turn off shutter power """ self.is_lower_open = True set_relay(LOWER_POWER,0) set_relay(LOWER_DIRECTION,0)
[docs] def open_lower(self) : """ Open lower shutter asynchronously """ if is_upper_open == True : set_relay(LOWER_POWER,0) set_relay(LOWER_DIRECTION,1) set_relay(LOWER_POWER,1) t=Timer(LOWER_TIME,self.set_upper_open) t.start() else : raise RuntimeError('cannot open lower shutter when upper shutter is not open')
[docs] def close_lower(self) : """ Close lower shutter """ if is_upper_open == True : set_relay(LOWER_POWER,0) set_relay(LOWER_DIRECTION,1) set_relay(LOWER_POWER,1) t=Timer(LOWER_TIME,self.set_upper_open) t.start() else : raise RuntimeError('cannot close lower shutter when upper shutter is not open')
[docs] def open_shutter(self,lower=False) : """ Open the dome shutter(s). If lower, wait 10s after starting upper to start lower """ if not self.safety.issafe() : self.logger.info('cannot open shutter due to weather condition!') return self.open_upper()
#if lower : # time.sleep(10) # self.open_lower()
[docs] def close_shutter(self,lower=False) : """ Close the dome shutter(s). If lower, wait 30s after starting lower to start upper """ if lower : self.close_lower() time.sleep(30) self.close_upper()
[docs] def atpark(self) : """ Is telescope at park position? """ az = self.get_azimuth() if abs(az-self.park_position) < DOME_TOLERANCE : return True else : return False
[docs] def set_park(self) : """ Set park position to current position """ self.park_position = self.get_azimuth()
[docs] def park(self) : """ Send dome to park asynchronously """ if not self.atpark() : t=Thread(target=self.sendpark) t.start()
[docs] def sendpark(self) : """ Go to park """ self.logger.info('sending to park') self.slewtoazimuth(self.park_position)
[docs] def abort_slew(self) : """ Turn off dome rotation power """ self.logger.info('abort: turning dome rotation power off') self.stop()
[docs] def stop(self) : """ Stop dome rotation """ self.logger.info('stopping dome rotation ') set_relay(DOME_POWER,0) self.slewing = False self.enc.delta=np.array([0,1,-1,2,-1,0,-2,1,1,-2,0,-1,2,-1,1,0])
#self.logger.debug('counter: {:d}'.format(self.enc.counter)) #self.logger.debug('counter16: {:d}'.format(self.enc.counter16)) #self.logger.debug('delta: {:f}'.format(self.enc.delta)) #self.logger.debug('total events: {:d}'.format(np.sum(self.enc.counter16))) #self.logger.debug('sum: {:f}'.format(np.sum(self.enc.delta*self.enc.counter16))) #self.logger.debug('pos: {:d}'.format(self.enc.pos))
[docs] def rotate(self,cw=True) : """ Start dome rotating """ self.stop() if self.verbose : self.logger.info('starting dome rotation: {:d} '.format(cw)) self.enc.reset() if cw : set_relay(DOME_DIRECTION,0) #self.enc.delta=[0,1,3,2,3,0,2,1,1,2,0,3,2,3,1,0] else : set_relay(DOME_DIRECTION,1) #self.enc.delta=[0,-3,-1,-2,-1,0,-2,-3,-3,-2,0,-1,-2,-1,-3,0] set_relay(DOME_POWER,1) self.slewing = True
[docs] def get_azimuth(self) : """ Get current dome azimuth """ self.azimuth = self.enc.pos/steps_per_degree + HOME_POSITION self.azimuth %= 360 return self.azimuth
[docs] def slewtoazimuth(self,azimuth) : """ Start slew to requested azimuth """ t=Thread(target=lambda : self.gotoazimuth(azimuth)) t.start()
[docs] def gotoazimuth(self,azimuth,timeout=180) : """ slew to requested azimuth """ current_az = self.azimuth self.logger.info('desired_az: {:f}'.format(azimuth)) self.logger.info(' current_az: {:f}'.format(current_az)) delta = diff(azimuth,current_az) self.logger.info(' delta: {:f}'.format(delta)) if abs(delta) < DOME_TOLERANCE : return elif delta > 0 : self.rotate(1) #if going CW, undershoot to allow coast delta = -1.0 else : self.rotate(0) #if going CCW, overshoot to allow coast delta = +1.0 t=timer.Timer() t.start() x = lambda azimuth : diff(azimuth,self.get_azimuth()) while abs(x(azimuth+delta)) > DOME_TOLERANCE/4. and t.elapsed()<timeout : #print(self.azimuth) #time.sleep(1) continue self.stop() if t.elapsed() > timeout : self.logger.info('Rotate timer expired before reaching desired azimuth !') self.logger.info('self.azimuth: {:f}'.format(self.azimuth)) t.stop() time.sleep(2) self.save_position()
def slewtoaltitude(self, altitude) : return #raise RuntimeError('altitude slew not implemented') def slave(self,val) : #self.slaved=True raise RuntimeError('slaving not available')
[docs] def set_relay(bit,value) : """ Utility routine to turn RELAYplates relays on (1) or off Always sleep 200 ms after a call to give system time to respond before subsequent call """ if value == 1 : RELAY.relayON(0,bit) else : RELAY.relayOFF(0,bit) time.sleep(0.2) return
def get_bit(bit,fake=None) : if fake is not None : return fake else : return 0
[docs] def diff(azimuth,current_az) : """ Get proper delta dome motion """ delta = ( azimuth - current_az ) if delta > 180 : delta-=360 elif delta < -180 : delta+=360 return delta