![]() |
fuse_kafka
|
00001 #!/usr/bin/env python 00002 # chkconfig: 2345 11 88 00003 ### BEGIN INIT INFO 00004 # Provides: fuse_kafka 00005 # Required-Start: 00006 # Required-Stop: 00007 # Default-Start: 3 5 00008 # Default-Stop: 00009 # Short-Description: run fuse_kafka 00010 # Description: 00011 ### END INIT INFO 00012 """ @package fuse_kafka 00013 Startup script for fuse_kafka. 00014 """ 00015 import fnmatch, re, sys, getopt, json, glob, os, subprocess,\ 00016 copy, time, subprocess, multiprocessing 00017 canlock = True 00018 try: 00019 import fcntl 00020 except: 00021 canlock = False 00022 """ CONFIGURATIONS_PATHS is the list of paths where the init script 00023 will look for configurations """ 00024 CONFIGURATIONS_PATHS = ["./conf/*", "./etc/fuse_kafka.conf", "/etc/fuse_kafka.conf", "/etc/*.txt", "C:/temp/*.txt"] 00025 class Crontab: 00026 """ manages a crontab """ 00027 def add_line_if_necessary(self, line): 00028 """ adds a line to a crontab if it is not there """ 00029 crontab = subprocess.Popen(["crontab", "-l"], 00030 stdout = subprocess.PIPE).communicate()[0] 00031 if not line in crontab.split("\n"): 00032 subprocess.Popen(["crontab"], stdin = subprocess.PIPE).communicate( 00033 input=crontab + line + "\n") 00034 class Mountpoints: 00035 """Utility class to umount non-responding or non-writable 00036 mountpoints""" 00037 def access(self, path): 00038 """ non-blocking check that a path is accessible """ 00039 p = multiprocessing.Process(target=os.access, args=(path, os.W_OK)) 00040 p.start() 00041 p.join(2) 00042 if p.is_alive(): 00043 p.terminate() 00044 p.join() 00045 return False 00046 return os.access(path, os.W_OK) 00047 def umount_non_accessible(self): 00048 """ for eac configured directory, checks if the directory is 00049 accessible. If it is not accessible 10 second after the first time 00050 it was not, umount it """ 00051 for path in Configuration().conf['directories']: 00052 if not self.access(path): 00053 time.sleep(10) 00054 if not self.access(path): 00055 subprocess.call(["fusermount", "-uz", path]) 00056 class Configuration: 00057 """ Utility class to load configurations from properties files """ 00058 def get_property(self, path, name): 00059 """ Get a property from a well defined property file. 00060 00061 path - configuration file path 00062 name - property name 00063 00064 Returns the first property value found in the given path with the 00065 given name, None if it was not found 00066 """ 00067 f = open(path) 00068 try: 00069 for line in f.readlines(): 00070 line = line.split('=', 1) 00071 if len(line) == 2 and line[0] == name: 00072 return line[1].strip() 00073 finally: 00074 f.close() 00075 def includes_subdir(self, dirs, subdir): 00076 """ Checks if a subdirectory is included in a list of prefix. 00077 00078 dirs - list of prefixes 00079 subdir - path to check for prefix 00080 00081 Returns True if dirs contains a prefix of subdir, False 00082 otherwise. 00083 """ 00084 for dir in dirs: 00085 if subdir.startswith(dir): 00086 return True 00087 return False 00088 def exclude_directories(self, paths, prefixes): 00089 """ Exclude directories from a list of directories based on 00090 prefixes 00091 00092 paths - list of paths from which to exclude prefixs 00093 prefixes - list of prefixes to exclude 00094 00095 Returns the path list with excluded directories 00096 """ 00097 return [path for path in paths if not self.includes_subdir(prefixes, 00098 os.path.realpath(path))] 00099 def exclude_from_conf(self, paths): 00100 self.conf['directories'] = self.exclude_directories( 00101 self.conf['directories'], paths) 00102 def __init__(self, configurations = CONFIGURATIONS_PATHS): 00103 self.configurations = configurations 00104 self.sleeping = False 00105 self.load() 00106 def parse_line(self, line, conf): 00107 """ Parse a configuration line 00108 00109 line - the line to parse 00110 conf - a dictionary which will be updated based on the parsing 00111 00112 Returns the configuration updated configuration based on the 00113 line 00114 """ 00115 line = line.split('=', 1) 00116 if len(line) == 2: 00117 key = line[0] 00118 if line[0].startswith('monitoring_logging_') \ 00119 or line[0].startswith('fuse_kafka_') \ 00120 or line[0] == 'monitoring_top_substitutions': 00121 key = key.replace('monitoring_', '') 00122 key = key.replace('fuse_kafka_', '') 00123 key = key.replace('logging_', '').replace('top_', '') 00124 if not key in conf.keys(): conf[key] = [] 00125 parsed = json.loads(line[1]) 00126 if type(parsed) is dict: 00127 for parsed_key in parsed.keys(): 00128 conf[key].append(parsed_key) 00129 conf[key].append(parsed[parsed_key]) 00130 else: 00131 conf[key].extend(parsed) 00132 def is_sleeping(self, var_run_path = "/var/run"): 00133 """ Returns True if fuse_kafka is in sleep mode """ 00134 return os.path.exists(var_run_path + '/fuse_kafka_backup') 00135 def unique_directories(self, conf_directories): 00136 """ return a list with duplicates removed 00137 (in absolute path) from conf_directories """ 00138 directories = [] 00139 abstract_directories = [] 00140 for directory in conf_directories: 00141 abstract_directory = os.path.abspath(directory) 00142 if not os.path.abspath(directory) in abstract_directories: 00143 directories.append(directory) 00144 abstract_directories.append(abstract_directory) 00145 return directories 00146 def load(self, var_run_path = "/var/run"): 00147 """ Loads configuration from configurations files """ 00148 self.conf = {} 00149 for globbed in self.configurations: 00150 for config in glob.glob(globbed): 00151 f = open(config) 00152 try: 00153 for line in f.readlines(): 00154 self.parse_line(line, self.conf) 00155 finally: 00156 f.close() 00157 if self.is_sleeping(var_run_path): 00158 self.exclude_from_conf(self.conf['sleep']) 00159 self.conf['directories'] = self.unique_directories(self.conf['directories']) 00160 if 'sleep' in self.conf: del self.conf['sleep'] 00161 def args(self): 00162 """ Returns the fuse_kafka binary arguments based on the 00163 parsed configuration """ 00164 result = [] 00165 for key in self.conf.keys(): 00166 result.append('--' + str(key)) 00167 for item in self.conf[key]: 00168 result.append(str(item)) 00169 return result 00170 def __str__(self): 00171 return " ".join(self.args()) 00172 class FuseKafkaService: 00173 """ Utility class to run multiple fuse_kafka processes as one service """ 00174 def __init__(self): 00175 self.auditctl_bin = "/sbin/auditctl" 00176 self.prefix = ["fuse_kafka", "_", "-oallow_other", 00177 "-ononempty", "-omodules=subdir,subdir=.", "-f", "--"] 00178 self.proc_mount_path = "/proc/mounts" 00179 if "FUSE_KAFKA_PREFIX" in os.environ: 00180 self.prefix = os.environ["FUSE_KAFKA_PREFIX"].split() + self.prefix 00181 def do(self, action): 00182 """ Actually run an action 00183 00184 action - the action name 00185 00186 """ 00187 getattr(self, action)() 00188 def start(self): 00189 if self.get_status() == 0: 00190 print("fuse_kafka is already running") 00191 return 00192 self.start_excluding_directories([]) 00193 def start_excluding_directories(self, excluded): 00194 """ Starts fuse_kafka processes """ 00195 env = os.environ.copy() 00196 env["PATH"] = "." + os.pathsep + env["PATH"] 00197 prefix = os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))+ os.sep 00198 for relative in ["usr" + os.sep + "bin", "usr" + os.sep + "lib", "lib", "bin"]: 00199 env["PATH"] = prefix + relative + os.pathsep + env["PATH"] 00200 env["LD_LIBRARY_PATH"] = os.pathsep + os.sep + "usr" + os.sep + "lib" 00201 self.configuration = Configuration() 00202 self.configuration.exclude_from_conf(excluded) 00203 directories = copy.deepcopy(self.configuration.conf['directories']) 00204 if directories == []: return 00205 modprobe = "/sbin/modprobe" 00206 if os.path.exists(modprobe): 00207 subprocess.call([modprobe, "fuse"]) 00208 for directory in directories: 00209 print("starting fuse_kafka on " + directory) 00210 if not os.path.exists(directory): 00211 os.makedirs(directory) 00212 process = subprocess.Popen( 00213 self.prefix + self.configuration.args(), 00214 env = env, shell = (os.sep == '\\')) 00215 self.do_auditctl_rule(str(process.pid)) 00216 if self.get_status() == 0: 00217 print("fuse_kafka started") 00218 def do_auditctl_rule(self, pid, action = '-A'): 00219 rule = action + " exit,never -F "+\ 00220 "path=/var/log/audit/audit.log -F perm=r -F pid="+ pid 00221 rule_file = "/etc/audit/audit.rules" 00222 if os.path.isfile(rule_file): 00223 if action == '-A': 00224 f = open(rule_file, "a+") 00225 try: 00226 f.write(rule + "\n") 00227 finally: 00228 f.close() 00229 if os.path.isfile(self.auditctl_bin): 00230 cmd = self.auditctl_bin + " " + rule 00231 subprocess.Popen(cmd.split()) 00232 def remove_auditctl_rules(self): 00233 if os.path.isfile(self.auditctl_bin): 00234 output = subprocess.Popen(["auditctl", "-l"], 00235 stdout=subprocess.PIPE).communicate()[0] 00236 p = re.compile(".*-a.*/var/log/audit/audit.log.*-F perm=r.* pid=([^ ]+).*") 00237 pids = [found[0] for found in [p.findall(l) for l in 00238 output.split("\n")] if len(found) > 0] 00239 for pid in pids: 00240 self.do_auditctl_rule(pid, "-d") 00241 def stop(self): 00242 """ Stops fuse_kafka processes """ 00243 if os.sep == '/': 00244 subprocess.call(["pkill", "-f", " ".join(self.prefix)]) 00245 else: 00246 subprocess.call(["taskkill", "/f", "/im", self.prefix[0] + ".exe"]) 00247 self.remove_auditctl_rules() 00248 if self.get_status() != 0: 00249 print("fuse_kafka stoped") 00250 def reload(self, var_run_path = "/var/run"): 00251 """ if fuse_kafka is running, reloads the dynamic part of 00252 the configuration. If it is not running, starts it """ 00253 if self.get_status() == 3: self.start() 00254 else: 00255 self.configuration = Configuration() 00256 configured_directories = copy.deepcopy(self.configuration.conf['directories']) 00257 f = open(var_run_path + "/fuse_kafka.args", "w") 00258 try: 00259 if canlock: fcntl.fcntl(f, fcntl.LOCK_EX) 00260 f.write(str(self.configuration)) 00261 finally: 00262 f.close() 00263 watched_directories = self.list_watched_directories() 00264 self.start_excluding_directories(watched_directories) 00265 for to_stop_watching in [a for a in watched_directories 00266 if a not in configured_directories]: 00267 self.stop_watching_directory(to_stop_watching) 00268 def stop_watching_directory(self, to_stop_watching): 00269 """ Stops fuse_kafka process for a specific directory """ 00270 if subprocess.call(["which", "fusermount"]) == 0: 00271 subprocess.call(["fusermount", "-uz", to_stop_watching]) 00272 if self.get_status() != 0: 00273 print("fuse_kafka stoped") 00274 def restart(self): 00275 """ Stops and starts fuse_kafka processes """ 00276 self.stop() 00277 while self.get_status() == 0: time.sleep(0.1) 00278 self.start() 00279 def fuse_kafka_running_at(self, pid): 00280 exe = "/proc/" + pid + "/exe" 00281 if os.path.isfile(exe) and \ 00282 os.path.realpath(exe).endswith("fuse_kafka"): 00283 return True 00284 comm = "/proc/" + pid + "/comm" # sometime you can't read exe 00285 if not os.path.isfile(comm): return False 00286 f = open(comm) 00287 if f and 'fuse_kafka' in f.read(): 00288 f.close() 00289 return True 00290 return False 00291 def list_watched_directories(self, var_run_path = "/var/run"): 00292 result = [] 00293 watched_dir = var_run_path + "/fuse_kafka/watched" 00294 p = re.compile("^" + watched_dir + "(.*)/([^/]+).pid$") 00295 for root, dirnames, filenames in os.walk(watched_dir): 00296 for filename in fnmatch.filter(filenames, '*.pid'): 00297 path = os.path.join(root, filename) 00298 tuples = p.findall(path) 00299 if len(tuples) > 0: 00300 tuple = tuples[0] 00301 if self.fuse_kafka_running_at(tuple[1]): 00302 result.append(tuple[0]) 00303 else: 00304 os.remove(path) 00305 return result 00306 def get_status(self): 00307 """ Displays the status of fuse_kafka processes """ 00308 status = 3 00309 for directory in self.list_watched_directories(): 00310 print("listening on " + directory) 00311 status = 0 00312 return status 00313 def status(self): 00314 status = self.get_status() 00315 sys.stdout.write("service is ") 00316 if status == 3: sys.stdout.write("not ") 00317 print("running") 00318 sys.exit(status) 00319 def cleanup(self): 00320 """ if a fuse kafka mountpoint is not accessible, umount it. 00321 Also installs this action in the crontab so it is launched 00322 every minute. """ 00323 Crontab().add_line_if_necessary("* * * * * " + os.path.realpath(__file__) + " cleanup") 00324 Mountpoints().umount_non_accessible() 00325 if __name__ == "__main__": 00326 if(len(sys.argv) <= 1): 00327 print("Usage: " + sys.argv[0] + " start|stop|status|restart|reload...") 00328 else: 00329 try: 00330 FuseKafkaService().do(sys.argv[1]) 00331 except AttributeError: 00332 print("no action called " + sys.argv[1]) 00333 exit(1)