fuse_kafka
src/fuse_kafka.py
00001 #!/usr/bin/env python
00002 # chkconfig: 2345 11 88
00003 ### BEGIN INIT INFO
00004 # Provides: fuse_kafka
00005 # Required-Start:
00006 # Required-Stop:
00007 # Default-Start:  3 5    
00008 # Default-Stop:
00009 # Short-Description: run fuse_kafka
00010 # Description:
00011 ### END INIT INFO
00012 """ @package fuse_kafka
00013 Startup script for fuse_kafka.
00014 """
00015 import fnmatch, re, sys, getopt, json, glob, os, subprocess,\
00016     copy, time, subprocess, multiprocessing
00017 canlock = True
00018 try:
00019     import fcntl
00020 except:
00021     canlock = False
00022 """ CONFIGURATIONS_PATHS is the list of paths where the init script
00023 will look for configurations """
00024 CONFIGURATIONS_PATHS = ["./conf/*", "./etc/fuse_kafka.conf", "/etc/fuse_kafka.conf", "/etc/*.txt", "C:/temp/*.txt"]
00025 class Crontab:
00026     """ manages a crontab """
00027     def add_line_if_necessary(self, line):
00028         """ adds a line to a crontab if it is not there """
00029         crontab = subprocess.Popen(["crontab", "-l"],
00030                 stdout = subprocess.PIPE).communicate()[0]
00031         if not line in crontab.split("\n"):
00032             subprocess.Popen(["crontab"], stdin = subprocess.PIPE).communicate(
00033                     input=crontab + line + "\n")
00034 class Mountpoints:
00035     """Utility class to umount non-responding or non-writable
00036     mountpoints"""
00037     def access(self, path):
00038         """ non-blocking check that a path is accessible """
00039         p = multiprocessing.Process(target=os.access, args=(path, os.W_OK))
00040         p.start()
00041         p.join(2)
00042         if p.is_alive():
00043             p.terminate()
00044             p.join()
00045             return False
00046         return os.access(path, os.W_OK)
00047     def umount_non_accessible(self):
00048         """ for eac configured directory, checks if the directory is
00049         accessible. If it is not accessible 10 second after the first time
00050         it was not, umount it """
00051         for path in Configuration().conf['directories']:
00052             if not self.access(path):
00053                 time.sleep(10)
00054                 if not self.access(path):
00055                     subprocess.call(["fusermount", "-uz", path])
00056 class Configuration:
00057     """ Utility class to load configurations from properties files """
00058     def get_property(self, path, name):
00059         """ Get a property from a well defined property file.
00060 
00061         path - configuration file path
00062         name - property name
00063 
00064         Returns the first property value found in the given path with the
00065         given name, None if it was not found
00066         """
00067         f = open(path)
00068         try:
00069             for line in f.readlines():
00070                 line = line.split('=', 1)
00071                 if len(line) == 2 and line[0] == name:
00072                     return line[1].strip()
00073         finally:
00074             f.close()
00075     def includes_subdir(self, dirs, subdir):
00076         """ Checks if a subdirectory is included in a list of prefix.
00077 
00078         dirs    - list of prefixes
00079         subdir  - path to check for prefix
00080 
00081         Returns True if dirs contains a prefix of subdir, False
00082         otherwise.
00083         """
00084         for dir in dirs:
00085             if subdir.startswith(dir):
00086                 return True
00087         return False
00088     def exclude_directories(self, paths, prefixes):
00089         """ Exclude directories from a list of directories based on
00090         prefixes
00091 
00092         paths       - list of paths from which to exclude prefixs
00093         prefixes    - list of prefixes to exclude
00094 
00095         Returns the path list with excluded directories
00096         """
00097         return [path for path in paths if not self.includes_subdir(prefixes,
00098             os.path.realpath(path))]
00099     def exclude_from_conf(self, paths):
00100         self.conf['directories'] = self.exclude_directories(
00101                 self.conf['directories'], paths)
00102     def __init__(self, configurations = CONFIGURATIONS_PATHS):
00103         self.configurations = configurations
00104         self.sleeping = False
00105         self.load()
00106     def parse_line(self, line, conf):
00107         """ Parse a configuration line
00108 
00109         line - the line to parse
00110         conf - a dictionary which will be updated based on the parsing
00111 
00112         Returns the configuration updated configuration based on the
00113         line
00114         """
00115         line = line.split('=', 1)
00116         if len(line) == 2:
00117             key = line[0]
00118             if line[0].startswith('monitoring_logging_') \
00119                     or line[0].startswith('fuse_kafka_') \
00120                     or line[0] == 'monitoring_top_substitutions':
00121                 key = key.replace('monitoring_', '')
00122                 key = key.replace('fuse_kafka_', '')
00123                 key = key.replace('logging_', '').replace('top_', '')
00124                 if not key in conf.keys(): conf[key] = []
00125                 parsed = json.loads(line[1])
00126                 if type(parsed) is dict:
00127                     for parsed_key in parsed.keys():
00128                         conf[key].append(parsed_key)
00129                         conf[key].append(parsed[parsed_key])
00130                 else:
00131                     conf[key].extend(parsed)
00132     def is_sleeping(self, var_run_path = "/var/run"):
00133         """ Returns True if fuse_kafka is in sleep mode """
00134         return os.path.exists(var_run_path + '/fuse_kafka_backup')
00135     def unique_directories(self, conf_directories):
00136         """ return a list with duplicates removed 
00137         (in absolute path) from conf_directories """
00138         directories = []
00139         abstract_directories = []
00140         for directory in conf_directories:
00141             abstract_directory = os.path.abspath(directory)
00142             if not os.path.abspath(directory) in abstract_directories:
00143                 directories.append(directory)
00144                 abstract_directories.append(abstract_directory)
00145         return directories
00146     def load(self, var_run_path = "/var/run"):
00147         """ Loads configuration from configurations files """
00148         self.conf = {}
00149         for globbed in self.configurations:
00150             for config in glob.glob(globbed):
00151                 f = open(config)
00152                 try:
00153                     for line in f.readlines():
00154                         self.parse_line(line, self.conf)
00155                 finally:
00156                     f.close()
00157         if self.is_sleeping(var_run_path):
00158             self.exclude_from_conf(self.conf['sleep'])
00159         self.conf['directories'] = self.unique_directories(self.conf['directories'])
00160         if 'sleep' in self.conf: del self.conf['sleep']
00161     def args(self):
00162         """ Returns the fuse_kafka binary arguments based on the
00163         parsed configuration """
00164         result = []
00165         for key in self.conf.keys():
00166             result.append('--' + str(key))
00167             for item in self.conf[key]:
00168                 result.append(str(item))
00169         return result
00170     def __str__(self):
00171         return " ".join(self.args())
00172 class FuseKafkaService:
00173     """ Utility class to run multiple fuse_kafka processes as one service """
00174     def __init__(self):
00175         self.auditctl_bin = "/sbin/auditctl"
00176         self.prefix = ["fuse_kafka", "_", "-oallow_other",
00177                 "-ononempty", "-omodules=subdir,subdir=.", "-f", "--"]
00178         self.proc_mount_path = "/proc/mounts"
00179         if "FUSE_KAFKA_PREFIX" in os.environ:
00180             self.prefix = os.environ["FUSE_KAFKA_PREFIX"].split() + self.prefix
00181     def do(self, action):
00182         """ Actually run an action 
00183 
00184         action - the action name
00185 
00186         """
00187         getattr(self, action)()
00188     def start(self):
00189         if self.get_status() == 0:
00190             print("fuse_kafka is already running")
00191             return
00192         self.start_excluding_directories([])
00193     def start_excluding_directories(self, excluded):
00194         """ Starts fuse_kafka processes """
00195         env = os.environ.copy()
00196         env["PATH"] = "." + os.pathsep + env["PATH"]
00197         prefix = os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))+ os.sep
00198         for relative in ["usr" + os.sep + "bin", "usr" + os.sep + "lib", "lib", "bin"]:
00199             env["PATH"] = prefix + relative + os.pathsep + env["PATH"]
00200         env["LD_LIBRARY_PATH"] = os.pathsep + os.sep + "usr" + os.sep + "lib"
00201         self.configuration = Configuration()
00202         self.configuration.exclude_from_conf(excluded)
00203         directories = copy.deepcopy(self.configuration.conf['directories'])
00204         if directories == []: return
00205         modprobe = "/sbin/modprobe"
00206         if os.path.exists(modprobe):
00207             subprocess.call([modprobe, "fuse"])
00208         for directory in directories:
00209             print("starting fuse_kafka on " + directory)
00210             if not os.path.exists(directory):
00211                 os.makedirs(directory)
00212         process = subprocess.Popen(
00213                 self.prefix + self.configuration.args(),
00214                     env = env, shell = (os.sep == '\\'))
00215         self.do_auditctl_rule(str(process.pid))
00216         if self.get_status() == 0:
00217             print("fuse_kafka started")
00218     def do_auditctl_rule(self, pid, action = '-A'):
00219         rule = action + " exit,never -F "+\
00220                 "path=/var/log/audit/audit.log -F perm=r -F pid="+ pid
00221         rule_file = "/etc/audit/audit.rules"
00222         if os.path.isfile(rule_file):
00223             if action == '-A':
00224                 f = open(rule_file, "a+")
00225                 try:
00226                     f.write(rule + "\n")
00227                 finally:
00228                     f.close()
00229         if os.path.isfile(self.auditctl_bin):
00230             cmd =  self.auditctl_bin + " " + rule
00231             subprocess.Popen(cmd.split())
00232     def remove_auditctl_rules(self):
00233         if os.path.isfile(self.auditctl_bin):
00234             output = subprocess.Popen(["auditctl", "-l"],
00235                     stdout=subprocess.PIPE).communicate()[0]
00236             p = re.compile(".*-a.*/var/log/audit/audit.log.*-F perm=r.* pid=([^ ]+).*")
00237             pids = [found[0] for found in [p.findall(l) for l in
00238                 output.split("\n")] if len(found) > 0]
00239             for pid in pids:
00240                 self.do_auditctl_rule(pid, "-d")
00241     def stop(self):
00242         """ Stops fuse_kafka processes """
00243         if os.sep == '/':
00244             subprocess.call(["pkill", "-f", " ".join(self.prefix)])
00245         else:
00246             subprocess.call(["taskkill", "/f", "/im", self.prefix[0] + ".exe"])
00247         self.remove_auditctl_rules()
00248         if self.get_status() != 0:
00249             print("fuse_kafka stoped")
00250     def reload(self, var_run_path = "/var/run"):
00251         """ if fuse_kafka is running, reloads the dynamic part of 
00252         the configuration. If it is not running, starts it """
00253         if self.get_status() == 3: self.start()
00254         else:
00255             self.configuration = Configuration()
00256             configured_directories = copy.deepcopy(self.configuration.conf['directories'])
00257             f = open(var_run_path + "/fuse_kafka.args", "w")
00258             try:
00259                 if canlock: fcntl.fcntl(f, fcntl.LOCK_EX)
00260                 f.write(str(self.configuration))
00261             finally:
00262                 f.close()
00263             watched_directories = self.list_watched_directories()
00264             self.start_excluding_directories(watched_directories)
00265             for to_stop_watching in [a for a in watched_directories 
00266                     if a not in configured_directories]:
00267                 self.stop_watching_directory(to_stop_watching)
00268     def stop_watching_directory(self, to_stop_watching):
00269         """ Stops fuse_kafka process for a specific directory """
00270         if subprocess.call(["which", "fusermount"]) == 0:
00271             subprocess.call(["fusermount", "-uz", to_stop_watching])
00272         if self.get_status() != 0:
00273             print("fuse_kafka stoped")
00274     def restart(self):
00275         """ Stops and starts fuse_kafka processes """
00276         self.stop()
00277         while self.get_status() == 0: time.sleep(0.1)
00278         self.start()
00279     def fuse_kafka_running_at(self, pid):
00280         exe = "/proc/" + pid + "/exe"
00281         if os.path.isfile(exe) and \
00282                 os.path.realpath(exe).endswith("fuse_kafka"):
00283                         return True
00284         comm = "/proc/" + pid + "/comm" # sometime you can't read exe
00285         if not os.path.isfile(comm): return False
00286         f = open(comm)
00287         if f and 'fuse_kafka' in f.read():
00288             f.close()
00289             return True
00290         return False
00291     def list_watched_directories(self, var_run_path = "/var/run"):
00292         result = []
00293         watched_dir = var_run_path + "/fuse_kafka/watched"
00294         p = re.compile("^" + watched_dir + "(.*)/([^/]+).pid$")
00295         for root, dirnames, filenames in os.walk(watched_dir):
00296             for filename in fnmatch.filter(filenames, '*.pid'):
00297                 path = os.path.join(root, filename)
00298                 tuples = p.findall(path)
00299                 if len(tuples) > 0: 
00300                     tuple = tuples[0]
00301                     if self.fuse_kafka_running_at(tuple[1]):
00302                         result.append(tuple[0])
00303                     else:
00304                         os.remove(path)
00305         return result
00306     def get_status(self):
00307         """ Displays the status of fuse_kafka processes """
00308         status = 3
00309         for directory in self.list_watched_directories():
00310             print("listening on " + directory)
00311             status = 0
00312         return status
00313     def status(self):
00314         status = self.get_status()
00315         sys.stdout.write("service is ")
00316         if status == 3: sys.stdout.write("not ")
00317         print("running")
00318         sys.exit(status) 
00319     def cleanup(self):
00320         """ if a fuse kafka mountpoint is not accessible, umount it.
00321         Also installs this action in the crontab so it is launched
00322         every minute. """
00323         Crontab().add_line_if_necessary("* * * * * " + os.path.realpath(__file__) + " cleanup")
00324         Mountpoints().umount_non_accessible()
00325 if __name__ == "__main__":
00326     if(len(sys.argv) <= 1):
00327         print("Usage: " + sys.argv[0] + " start|stop|status|restart|reload...")
00328     else:
00329         try:
00330             FuseKafkaService().do(sys.argv[1])
00331         except AttributeError:
00332             print("no action called " + sys.argv[1])
00333             exit(1)
 All Data Structures Files Functions Variables Defines