#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # Copyright by Scott Severance # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. # IMPORTANT: Set the defaults below to values appropriate for your system. # # This module is compatible with Python 3. It is incompatible with Python 2. '''Monitor network connectivity and play a sound when the status changes. This script pings a host every 30 seconds to monitor whether the network is accessible. If the network goes down, it prints a message, plays a sound, and checks every 10 seconds for the network to be restored. When it's restored, it playes another sound and resumes monitoring. The following defaults are specified in the source at the module level: default_host: the host to ping if no other host is specified defaults_linux: a dictionary containing the default error sound (played when the network goes down), the default success sound (played when the network comes up) and the program to use to play sounds defaults_win: as with defaults_linux but for Windows systems Usage: {prog} [-u FILENAME] [-d FILENAME] [-p EXECUTABLE] [-s] [-c] [HOSTNAME] {prog} (--help | --version) Monitor network connectivity and alert on change. In most cases, you won't need to change any options from their defaults. Options: HOSTNAME The host to ping. [Default: {default_host}] -u FILENAME, --up-sound FILENAME The sound to play when the network comes up. [Default: {success_sound}] -d FILENAME, --down-sound FILENAME The sound to play when the network goes down. [Default: {error_sound}] -p EXECUTABLE, --player EXECUTABLE The program to use to play sounds. Only available on Linux. [Default: {player}] -s, --silent If specified, {prog} will not play any sounds. -c, --no-clear-screen If specified, the screen will not be cleared (nor will extra blank lines be added) before {prog} runs. ''' import datetime import multiprocessing as mp import os import re import subprocess import sys import time if __name__ == '__main__': use_docopt = True if os.path.isdir('/home/scott/bin/schema'): sys.path.insert(1, '/home/scott/bin/schema') try: from docopt import docopt except ImportError: #exit('Running this script requires the Python module "docopt."') use_docopt = False try: import schema except ImportError: #exit('Running this script requires the Python module "schema," which is ' # 'referenced by the "docopt" module.') use_docopt = False #### IMPORTS ELSEWHERE IN SOURCE: # import argparse # import socket # import textwrap # import winsound ################################################################################ # Set the following defaults to values appropriate on your system. # ################################################################################ default_host = '8.8.8.8' defaults_linux = { 'error_sound': '/home/scott/misc/sounds/MacOS Classic/Quack.wav', 'success_sound': '/home/scott/misc/sounds/MacOS Classic/Wild Eep.wav', 'player': 'aplay' } defaults_win = { 'error_sound': r'C:\Windows\media\ir_end.wav', 'success_sound': r'C:\Windows\media\tada.wav', 'player': None } ################################################################################ # End setting of defaults. # ################################################################################ version = 'network_monitor 1.0' if os.name == 'posix': defaults = defaults_linux else: defaults = defaults_win __doc__ = __doc__.format(prog=os.path.basename(sys.argv[0]), success_sound=defaults['success_sound'], error_sound=defaults['error_sound'], player=defaults['player'], default_host=default_host) class SoundNotFoundError(FileNotFoundError): pass class PlayerNotFoundError(FileNotFoundError): pass class Collection(object): '''A very simple class to hold arbitrary settings as properties. To initialize, pass in any desired settings as keyword arguments (you may use only keyword arguments). To use, simply access the keywords as properties. WARNING: If you add a property directly, without using the add_property method, it is your responsibility to ensure that there are no naming clashes with the class's methods. Example: foo = Collection(bar=1, baz='a') foo.bar # 1 foo.baz # 'a' foo.other = 6 # add a new item foo.add_property('other', 6) # alternate way to add a new item ''' def __init__(self, **kwargs): object.__setattr__(self, '_contained_items', set()) for k, v in kwargs.items(): self.add_property(k, v) #self.__dict__.update(kwargs) def __setattr__(self, name, value): if name in self._contained_items: object.__setattr__(self, name, value) else: self.add_property(name, value) def add_property(self, key, value): '''Adds a property with error-checking. Equivalant to self.key = value''' if key in (dir(self) or self._contained_items): raise ValueError('The key "{}" is already in use.'.format(key)) object.__setattr__(self, key, value) self._contained_items.add(key) def rm(self, *args): '''Deletes the listed properties''' for i in args: if i in self._contained_items: del self.__dict__[i] self._contained_items.remove(i) else: raise AttributeError( 'The attribute "{}" does not exist or cannot be removed.'.format(i) ) def __repr__(self): return '{cls}({args})'.format( cls=type(self).__name__, args=', '.join(sorted( ['{}={}'.format(k, repr(v)) for k, v in self.__dict__.items() if not k.startswith('_')] )) ) class Pinger(): '''Handles pinging This class is responsible for sending pings. It takes one optional argument, the host to ping. If the host isn't specified, or is None, then it will be set to the value of the module-level variable default_host. ''' def __init__(self, host=None): if host is None: self.host = default_host else: self.host = host def ping(self, count=1, timeout=10): '''Sends the ping This method sends a ping using the shell command appropriate to the OS it is running under. The parameter "count" specifies the number of ping packets to send. This method returns the ping command's integer exit status, which is 0 for success and nonzero for error. ''' if os.name == 'posix': cmd = ['ping', '-qc' + str(count), self.host] else: cmd = ['ping', '-n', str(count), self.host] s = subprocess try: return s.call(cmd, stdout=s.DEVNULL, stderr=s.DEVNULL, timeout=timeout) except subprocess.TimeoutExpired: return 1 except Exception: # Race condition at exit sometimes results in ProcessLookupError and # maybe other exceptions. Swallow them all. return 2 class Monitor(): '''Handles monitoring This class is responsible for monitoring network connectivity. The normal way to use it is via the monitor() method. Parameters: pinger: an instance of the Pinger class error_sound: path to the sound file to play when the network goes down success_sound: path to the sound file to play when the network comes up player: the program to play sounds. See the class SoundPlayer for details silent: If true, no sounds will be played If any parameter other than pinger is unspecified or None, it will be set from the OS-specific defaults (defaults_linux or defaults_win) which are stored in module-level variables. ''' def __init__(self, pinger, error_sound=None, success_sound=None, player=None, silent=False): if error_sound is None: error_sound = defaults['error_sound'] if success_sound is None: success_sound = defaults['success_sound'] self.pinger = pinger if not isinstance(pinger, Pinger): raise TypeError("pinger must be an instance of class Pinger") self.player = SoundPlayer(player=player, success=success_sound, error=error_sound, silent=silent) def network_is_up(self): '''Tests whether the network is up. This method first sleeps, then tries to ping. If successful, it returns True. If unsuccessful, it tries a second time before returning False. This method is not normally called directly. ''' time.sleep(30) if self.pinger.ping() == 0: return True else: time.sleep(1) if self.pinger.ping() == 0: return True return False def network_is_down(self): '''Tests whether the network is down. This method first sleeps, then tries to ping. If successful, it returns False. Otherwise, it returns True. This method is not normally called directly. ''' time.sleep(10) if self.pinger.ping() == 0: return False else: return True def monitor(self, printer): '''Monitors the network. This method handles monitoring network up and down states (using the network_is_up() and network_is_down() methods. It also uses the Printer class to print the status to the console in an OS-appropriate manner. It takes one argument, an object of class Printer. It runs in an infinite loop. To break out of the loop, cause an exception, such as KeyboardInterrupt, to be raised (e.g., by pressing ^C). The calling code should then handle the exception appropriately. ''' if not isinstance(printer, Printer): raise TypeError("printer must be an instance of the Printer class") printer.begin() net_up = True printer.net_up() try: while True: printer.dot() if net_up: net_up = self.network_is_up() if not net_up: self.player.error() printer.net_down() else: net_up = not self.network_is_down() if net_up: self.player.success() printer.net_up() finally: printer.end() class Printer(): '''Class to handle printing to the terminal. This class contains methods to enable the network monitor to print to the terminal in a cross-platform way. If subclassing, you will most likely need to override two methods: __init__ and _print. ''' def __init__(self, pinger, no_clear_screen=False): '''Sets up an instance Arguments: pinger MUST be an instance of type Pinger. It SHOULD be the the same instance as used to do the actual pinging or a copy thereof. no_clear_screen: If true, the screen won't be cleared upon startup, nor will a sequence of blank lines be inserted. ''' self._host = pinger.host self._rich_formatting = False self._dot_counter = 0 self._dot_line_counter = 0 self._dot_space_counter = 0 self._term_width = 0 self._mode = 'up' self.no_clear_screen = no_clear_screen if os.name == 'posix' and 'TERM' in os.environ and re.match('xterm', os.environ['TERM']): self._rich_formatting = True if self._rich_formatting: self._title_esc_start = r'\033]0;' # titlebar escape code - start self._title_esc_end = r'\007' # titlebar escape code - end self._color1a = r"\033[4;34m" self._color2a = r"\033[0;31m" self._color3a = r"\033[0;33m" self._color1b = r"\033[4;34m" self._color2b = r"\033[1;32m" self._color3b = r"\033[36m" self._transparent = r"\033[0m" #Transparent - don't change self._line_begin = '»' self._line_end = '«' else: self._title_esc_start = '' self._title_esc_end = '' self._color1a = '' self._color2a = '' self._color3a = '' self._color1b = '' self._color2b = '' self._color3b = '' self._transparent = '' self._line_begin = '<' self._line_end = '>' self._window_width() def begin(self): '''Initializes the terminal in preparation for program run''' self.clear_screen() if self._term_width != 74: self._print('>> Current window width: {}\n>> Suggested window width is 74, which results in 50 dots per line.'.format(self._term_width), True) if self._rich_formatting: c = subprocess.call c(['echo', '-ne', "{title_st} Monitoring {host}...{title_end}".format(title_st=self._title_esc_start, host=self._host, title_end=self._title_esc_end)]) #c(['echo', '-ne', r'\E[?25l']) ## Hide the cursor # For some reason this works in Bash but not Python. c(['stty', '-echo']) ## disable echoing to avoid messing up the output self._first_use = True self._print('{color3a}{cur_time}{transparent} Monitoring {color1a}{host}{transparent}...'.format( color3a=self._color3a, cur_time=self._time(), transparent=self._transparent, color1a=self._color1a, host=self._host ), True) def end(self): '''Prints final message and handles cleanup just before program exit. It is recommended to call this within a finally block to ensure that it gets run. ''' if self._mode == 'up': color3 = self._color3a else: color3 = self._color3b self._print('{c3}{ln_end}{transp}'.format(c3=color3, ln_end=self._line_end, transp=self._transparent), True) if self._rich_formatting: c = subprocess.call c(['echo', '-ne', "{start}{end}".format(start=self._title_esc_start, end=self._title_esc_end)]) c(['stty', 'echo']) ## re-enable echoing #c(['echo', '-ne', r'\E[?25h']) ## Show the cursor # see self.begin() self._print('Current ping count: {}. Exiting.'.format(self._dot_counter), True) if os.name == 'nt': self._print('Exiting in: ', False) for i in reversed(range(0, 6)): time.sleep(1) self._print('{} '.format(str(i)), False) self._print('', True) def net_up(self): '''Switches the printer to network up mode and prints appropriate messages.''' self._dot_line_counter = 13 # First dot indent self._dot_space_counter = 0 self._mode = 'up' if not self._first_use: if self._dot_counter == 1: tests = 'test' else: tests = 'tests' self._print('{c3b}{ln_end}{transp}\n{c3b}{cur_time}{transp} {c2b}You now have network access{transp} after {dots} {tsts}.'.format( c3b=self._color3b, ln_end=self._line_end, transp=self._transparent, cur_time=self._time(), c2b=self._color2b, dots=str(self._dot_counter), tsts=tests ), True) self._dot_counter = 0 self._print('Monitoring: {c3a}{ln_beg}{transp}'.format(c3a=self._color3a, ln_beg=self._line_begin, transp=self._transparent), False) self._first_use = False def net_down(self): '''Switches the printer to network down mode and prints appropriate messages.''' self._dot_line_counter = 13 self._dot_space_counter = 0 self._mode = 'down' if not self._first_use: if self._dot_counter == 1: tests = 'test' else: tests = 'tests' self._print('{c3a}{le}{transp}\n{c2a}{cur_time}{transp} {c3b}The host {c1b}{host}{transp}{c3b} is unreachable{transp} after {dots} {tsts}.'.format( c3a=self._color3a, le=self._line_end, transp=self._transparent, c2a=self._color2a, cur_time=self._time(), c3b=self._color3b, dots=str(self._dot_counter), tsts=tests, c1b=self._color1b, host=self._host ), True) self._dot_counter = 0 self._print('{c3b}{cur_time}{transp} Pinging {c1b}{host}{transp}...'.format( c3b=self._color3b, cur_time=self._time(), transp=self._transparent, c1b=self._color1b, host=self._host ), True) self._print('Testing: {c3b}{ln}{tr}'.format(c3b=self._color3b, ln=self._line_begin, tr=self._transparent), False) self._first_use = False def dot(self): '''Prints the dots. This method is responsible for visual feedback regarding each ping that doesn't result in a mode change. In addition to printing dots, it also handles formatting the result onscreen. ''' self._window_width() if self._mode == 'up': color3 = self._color3a else: color3 = self._color3b if self._dot_line_counter >= (self._term_width - 2): self._print('{c3}{le}{tr}\n {c3}{lb}{tr}'.format( c3=color3, le=self._line_end, tr=self._transparent, lb=self._line_begin ), False) self._dot_line_counter = 13 self._dot_space_counter = 0 if self._dot_space_counter > 1 and self._dot_space_counter % 5 == 0 and self._dot_line_counter <= (self._term_width - 3): if self._rich_formatting: self._print(' ·', False) else: self._print(' .', False) self._dot_line_counter += 2 else: if self._rich_formatting: self._print('·', False) else: self._print('.', False) self._dot_line_counter += 1 self._dot_counter += 1 self._dot_space_counter += 1 def clear_screen(self): '''Clears the screen''' if self.no_clear_screen: pass elif os.name == 'posix': subprocess.call(['clear']) else: self._print('\n\n\n\n', True) def _print(self, string, newline): '''Handles printing for mixed environments. Parameters: string: The string to print newline: Boolean where True means to print a newline after the the string and False means not to do so ''' if self._rich_formatting: if newline: opt = '-e' else: opt = '-ne' subprocess.call(['echo', opt, string]) else: if newline: print(string, flush=True) else: print(string, end='', flush=True) def _time(self): '''Returns a formatted time''' return datetime.datetime.now().strftime('%I:%M:%S %p') def _window_width(self): '''Determines the width of the window''' self._term_width, height = os.get_terminal_size() class SoundPlayer(): '''Plays sounds This class takes care of playing sounds in a cross-platform manner. This class determines its methods based on the parameters given at initialization. Calling the method named by one of the keyword arguments supplied at creation will play that sound. Example: player = SoundPlayer(error='error.wav', success='success.wav') player.error() # Plays 'error.wav' player.success() # Plays 'success.wav' ''' def __init__(self, silent=False, player=None, **sounds): '''Creates the instance. Available sounds are created by giving the sound name as a keyword argument and the path to the sound file as the argument value. The path may be specified in any form that can be directly accessed from this program, relative or absolute. No transformations will be applied. If player is None, the player will be chosen based on the OS default. On posix, that's the value of defaults_linux['player']. On Windows, the winsound module will be used unconditionally and player will be ignored, if it is set to None. If you specify player (on any OS), the value will be interpreted as the beginning of a command line to be passed to subprocess.call. If silent is True, all calls to play sounds will return immediately without playing anything. NOTE 1: All arguments should be specified as keyword arguments. NOTE 2: The default player under Windows can only handle .wav files. Example: SoundPlayer(error='/path/to/error.wav', success='/path/to/success') ''' self.player = player self.sounds = sounds self.silent = silent if not silent: for name, path in sounds.items(): if not os.path.exists(path): raise SoundNotFoundError(path) self._use_subprocess = True if player is None and not silent: if os.name == 'posix': self.player = defaults['player'] else: try: import winsound as w except ImportError: #sys.exit("Unable to use default sound player. Please specify an alternate.") raise PlayerNotFoundError("Python's Windows-only winsound module") self._use_subprocess = False self.player = w.PlaySound self._winsound_options = w.SND_FILENAME|w.SND_ASYNC|w.SND_NOSTOP elif not isinstance(player, str): raise TypeError("player must be a string or None") if len(sounds) < 1: raise ValueError("You must specify at least one sound") if self._use_subprocess and not silent: s = subprocess try: s.call([self.player], stdout=s.DEVNULL, stderr=s.DEVNULL) except FileNotFoundError as e: raise PlayerNotFoundError(self.player) from e def __getattr__(self, name): '''The magic is here. This method is what enables dynamic method creation. See the class docstring for details. ''' if name not in self.sounds.keys(): raise AttributeError('The sound "{}" is not defined.'.format(name)) def wrapper(*args, **kwargs): return self._play(name) return wrapper def __str__(self): '''Returns a useful, human-readable string representation of the instance''' return "".format(', '.join(sorted(self.sounds.keys()))) def __repr__(self): player = self.player if not isinstance(player, str): player = None methods = ', '.join(['{}={}'.format(j, repr(k)) for j, k in self.sounds.items()]) return 'SoundPlayer(player={}, silent={}, {})'.format(repr(player), repr(self.silent), methods) def _play(self, name): '''Plays the sound given by name''' if self.silent: return True def play_sound(q=None): # Called in a separate process so playing the sound doesn't block anything else. '''q is a multiprocessing.Queue object for interprocess communication.''' try: snd = self.sounds[name] if self._use_subprocess: s = subprocess return s.call([self.player, snd], stdout=s.DEVNULL, stderr=s.DEVNULL, timeout=20) else: return self.player(snd, self._winsound_options) except (KeyboardInterrupt, EOFError): sys.exit(0) except subprocess.TimeoutExpired as e: #if q: # q.put(e) sys.stderr.write('\nSOUND ERROR: Unable to play the sound "{}": Timeout expired.\n\n'.format(snd)) sys.exit(1) except FileNotFoundError: sys.stderr.write('\nSOUND ERROR: Unable to locate the sound player "{}".\n\n'.format(self.player)) sys.exit(2) #q = mp.Queue() proc_name = 'Play sound: {}; time: {}'.format(name, datetime.datetime.now().strftime('%I:%M:%S %p')) proc = mp.Process(target=play_sound, name=proc_name)#, args=(q,)) proc.start() #if not q.empty(): # print('hi') # raise q.get() def _arg_parser(): '''Parses the command-line arguments. To be used when the module is loaded directly, as opposed to being imported. ''' def posix_only(s): if os.name == 'posix': return s else: raise OSError('Wrong OS type') def valid_hostname(name): def is_valid_hostname(hostname): ### Taken from http://stackoverflow.com/a/33214423/713735 if hostname[-1] == ".": # strip exactly one dot from the right, if present hostname = hostname[:-1] if len(hostname) > 253: return False # must be not all-numeric, so that it can't be confused with an ip-address if re.match(r"[\d.]+$", hostname): return False allowed = re.compile("(?!-)[A-Z\d-]{1,63}(?>> SWITCHING TO SILENT MODE. <<<\n\n'.format(textwrap.fill(err, width=width))) args_collection['silent'] = True args.no_clear_screen = True monitor = Monitor(**args_collection) except PlayerNotFoundError as e: sys.stderr.write("\nERROR: The specified sound player ({}) doesn't exist. Switching to silent mode.\n\n".format(e)) args_collection['silent'] = True args.no_clear_screen = True monitor = Monitor(**args_collection) printer = Printer(pinger, args.no_clear_screen) try: # and away we go... monitor.monitor(printer) except (KeyboardInterrupt, EOFError): sys.exit(0) # except FileNotFoundError: # sys.stderr.write("\nERROR: The specified sound player doesn't exist. Switching to silent mode.\n\n") # args_collection['silent'] = True # monitor = Monitor(**args_collection) # try: # monitor.monitor(Printer(pinger, True)) # try again # except KeyboardInterrupt: # sys.exit(0) if __name__ == '__main__': _main()