![]() |
fuse_kafka
|
00001 #!/usr/bin/env python 00002 """ Builds unit test binary """ 00003 try: 00004 import base64, subprocess, sys, glob, os, json, thread 00005 import multiprocessing, shutil, time, unittest 00006 except ImportError, e: 00007 print "failed importing module", e 00008 from fabricate import * 00009 def get_define(source, name): 00010 """ Returns the current define for fuse_kafka based on src/source """ 00011 f = open("src/" + source + ".h") 00012 result = [] 00013 while True: 00014 result = f.readline().split() 00015 if len(result) == 3 and result[0] == "#define" and result[1] == name: 00016 break; 00017 result = result[-1][1:-1] 00018 f.close() 00019 return result 00020 class Plugins: 00021 def get_macro_definition(self, name, path): 00022 cmd = "sh -c \"grep --color=no '^"+name+"(' " + path + ".c | sed 's/^"+name+"(\(.*\))$/\\1/'\"" 00023 return os.popen(cmd).read().rstrip() 00024 def get_target(self, path): 00025 target_str = self.get_macro_definition("target", path) 00026 if target_str == '': return '.*' 00027 return target_str 00028 def get_requirements(self, path): 00029 required_str = self.get_macro_definition("requires", path) 00030 required = [] 00031 if required_str != '': required = required_str.split("\n") 00032 return required 00033 def __init__(self, cc): 00034 self.cc = cc 00035 _dir = "src/plugins/input/" 00036 self.tests_sources = [os.path.splitext(os.path.basename(a))[0] for a in glob.glob(_dir + "*_test.c")] 00037 self.tests_paths = [_dir.replace("src/", "") + x for x in self.tests_sources] 00038 self.libraries_sources = [os.path.splitext(os.path.basename(a))[0] for a in glob.glob(_dir + "*.c")] 00039 self.libraries_sources = [x for x in self.libraries_sources if x not in self.tests_sources] 00040 self.libs_of = {} 00041 self.targets_of = {} 00042 self.includes_of = {} 00043 self.kind_of = {} 00044 self.shareds_objects = {} 00045 self.objects = {} 00046 self.test_of = {} 00047 for kind in ['input', 'output']: 00048 self.fill_up(kind) 00049 def fill_up(self, kind): 00050 _dir = "src/plugins/" + kind + "/" 00051 self.tests_sources += [os.path.splitext(os.path.basename(a))[0] for a in glob.glob(_dir + "*_test.c")] 00052 self.tests_paths += [_dir.replace("src/", "") + x for x in self.tests_sources] 00053 current_sources = [os.path.splitext(os.path.basename(a))[0] for a in glob.glob(_dir + "*.c")] 00054 current_sources = [x for x in current_sources if x not in self.tests_sources] 00055 prefix = get_define("version", kind.upper() + "_PLUGIN_PREFIX") 00056 for lib in current_sources: 00057 self.kind_of[lib] = kind 00058 required = self.get_requirements(_dir + lib) 00059 self.targets_of[lib] = self.get_target(_dir + lib) 00060 self.libs_of[lib] = required + default_libs 00061 self.includes_of[lib] = required 00062 self.test_of[lib] = ((_dir + lib) + "_test").replace("src/", "") 00063 self.shareds_objects[lib] = prefix + lib + ".so" 00064 self.objects[lib] = prefix + lib + ".o" 00065 self.libraries_sources += current_sources 00066 cc = 'gcc' 00067 if "CC" in os.environ: 00068 cc = os.environ["CC"] 00069 _flags = [] 00070 for flag in ["CFLAGS", "LDFLAGS"]: 00071 if flag in os.environ: 00072 _flags = _flags + os.environ[flag].split() 00073 zookeeper_type = "zookeeper_st" 00074 if "zookeeper_st" in os.environ: 00075 zookeeper_type = "zookeeper_st" 00076 cc = [cc, _flags] 00077 sources = ['fuse_kafka'] 00078 binary_name = sources[0] 00079 common_libs = ["m", "dl", "pthread", "jansson"]#, "ulockmgr"] 00080 libs = [zookeeper_type, "rdkafka", "z"] + common_libs 00081 default_libs = ["m", zookeeper_type, "rdkafka", "jansson", "dl"] 00082 if "LIBS" in os.environ: 00083 additional_libs = [a.replace("-l", "") for a in os.environ["LIBS"].split()] 00084 default_libs += additional_libs 00085 libs += additional_libs 00086 plugins = Plugins(cc) 00087 flags = ['-D_FILE_OFFSET_BITS=64', '-DCOMMIT="' + os.popen("git log --format=%H -n 1").read().rstrip() + '"'] 00088 if "CFLAGS" in os.environ: 00089 flags = os.environ["CFLAGS"].split() + flags 00090 test_flags = ['-fprofile-arcs', '-ftest-coverage', '-DTEST="out"'] 00091 logstash_directory = "logstash-1.5.2" 00092 logstash_archive = logstash_directory + ".tar.gz" 00093 logstash_server = "https://download.elastic.co/logstash/logstash" 00094 kafka_server = "http://mir2.ovh.net/ftp.apache.org/dist/kafka/" 00095 kafka_version = "0.8.1.1" 00096 scala_version = "2.8.0" 00097 kafka_directory = "kafka_" + scala_version + "-" + kafka_version 00098 kafka_archive = kafka_directory + ".tgz" 00099 kafka_bin_directory = kafka_directory + "/bin/" 00100 kafka_config_directory = kafka_directory + "/config/" 00101 class Benchmark: 00102 def run_low_level(self): 00103 return os.popen("bonnie++ -q -n 128 -d /tmp/fuse-kafka-test").read() 00104 def run(self): 00105 s = self.run_low_level().split(",") 00106 create_subsections = ["create", "read", "delete"] 00107 output_subsections = ["per char", "block", "rewrite"] 00108 input_subsections = ["sequential per char", 00109 "sequential block", "random seeks"] 00110 res = { 00111 "version": s[0], 00112 "hostname": s[2], 00113 "timestamp": int(s[4]), 00114 "files": int(s[19]), 00115 "sequential create": self.parse_action( 00116 s, 24, 0, create_subsections), 00117 "random create": self.parse_action(s, 24, 6, 00118 create_subsections), 00119 "sequential output": self.parse_action(s, 7, 0, 00120 output_subsections), 00121 "input": self.parse_action(s, 7, 6, 00122 input_subsections), 00123 } 00124 return res 00125 def int(self, i): 00126 if i[0] == '+': return 0; 00127 return float(i) 00128 def p(self, s, _, d, subsections, i, n): 00129 return { 00130 "per second": self.int(s[_ + i + d]), 00131 "cpu percent": self.int(s[_ + i + 1 + d]), 00132 "latency": s[_ + n + d/2] 00133 } 00134 def parse_action(self, s, _, d, subsections): 00135 return { 00136 subsections[0]: self.p(s, _, d, subsections, 0, 18), 00137 subsections[1]: self.p(s, _, d, subsections, 2, 19), 00138 subsections[2]: self.p(s, _, d, subsections, 4, 20) 00139 } 00140 class Benchmarks: 00141 def generate(self): 00142 import json 00143 result = {} 00144 os.system('./src/fuse_kafka.py start') 00145 result["with fuse kafka"] = Benchmark().run() 00146 os.system('./src/fuse_kafka.py stop') 00147 result["without fuse kafka"] = Benchmark().run() 00148 f = open("benchs/results.js", "w") 00149 f.write("function get_results() { \nreturn "\ 00150 + json.dumps(result, sort_keys=True, 00151 indent=4, separators=(',', ': ')) + "; \n}\n") 00152 f.close() 00153 class FuseKafkaLog: 00154 """ Utility to read messages from kafka based on fuse kafka format """ 00155 def __init__(self, zkconnect = "localhost"): 00156 self.select = None 00157 self.zkconnect = zkconnect 00158 def run_command(self, *command): 00159 """ Run an interactive command line 00160 00161 command - the command to run as an argument array 00162 00163 Returns an iterator to the command stdout 00164 """ 00165 p = subprocess.Popen(command, 00166 stdout=subprocess.PIPE, 00167 stderr=subprocess.STDOUT) 00168 return iter(p.stdout.readline, '') 00169 def pretty_print(self, string): 00170 """ Displays a json logstash/fuse_kafka event in a user friendly fashion 00171 00172 string - the JSON input logstash/fuse_kafka event 00173 00174 Example 00175 00176 pretty_print('{"command": "bXkgY29tbWFuZA==", "@message": "bXkgbWVzc2FnZQ==", ' 00177 + '"fields": {"a": "v"}, "tags": ["tag"]}') 00178 00179 prints: 00180 00181 event: 00182 message_size-added: 0 00183 fields: 00184 a: v 00185 command: my command 00186 @message: my message 00187 tags: 00188 - tag 00189 """ 00190 struct = self.load_fuse_kafka_event(string) 00191 print "event:" 00192 for key in struct: 00193 if self.select != None and not key in self.select: 00194 continue 00195 sys.stdout.write(" " + key + ": ") 00196 value = struct[key] 00197 if type(value) is dict: 00198 print 00199 for name in value: 00200 print " ", name + ':', value[name] 00201 elif type(value) is list: 00202 print 00203 for v in value: 00204 print " - ", v 00205 else: 00206 print value 00207 def load_fuse_kafka_event(self, string): 00208 """ Decodes a json logstash/fuse_kafka event string, i.e.: 00209 - does a json decoding 00210 - decodes @message and command fields 00211 - adds message_size-added field 00212 00213 string - the JSON input logstash/fuse_kafka event 00214 00215 Example 00216 00217 build.FuseKafkaLog().load_fuse_kafka_event( 00218 '{"command": "bXkgY29tbWFuZGU=", "@message": "bXkgbWVzc2FnZQ=="}') 00219 00220 => {'message_size-added': 10, 00221 u'command': 'my commande', 00222 u'@message': 'my message'} 00223 00224 Returns the decoded json object 00225 """ 00226 event = json.loads(string) 00227 for item in ["@message", "command"]: 00228 event[item] += "=" * ((4 - len(event[item]) % 4) % 4) 00229 event[item] = base64.b64decode(event[item] + "==") 00230 event["message_size-added"] = len(event["@message"]) 00231 return event 00232 def start(self): 00233 """ Launches a kafka console consumer and pretty prints 00234 fuse_kafka/logstash events from this consumer 00235 00236 - SELECT (environment variable): if defined, lists what field 00237 names should be retrieved (whitespace separated) 00238 """ 00239 if os.environ.get('SELECT') != None: 00240 self.select = os.environ.get('SELECT').split() 00241 for line in self.run_command(os.getcwd() + "/" 00242 + kafka_bin_directory + 'kafka-console-consumer.sh', 00243 "--zookeeper", self.zkconnect, "--topic", "logs"): 00244 try: 00245 self.pretty_print(line) 00246 except ValueError: 00247 print line 00248 def get_version(): 00249 """ Returns the current version for fuse_kafka based on src/version.h """ 00250 return get_define("version", "VERSION") 00251 def bump_version(): 00252 """ Changes the version number if v variable if specified: 00253 - The version number is changed in src/version.h 00254 - New packaging files are created with their version bumped 00255 displays a Usage message otherwise 00256 00257 Example 00258 00259 $ ./build.py bump_version 00260 Usage: $ v=0.1.4 ./build.py bump_version 00261 00262 - v (environment variable): the new version 00263 """ 00264 v = os.environ.get('v') 00265 if v == None: 00266 print("Usage: $ v=" + get_version() + " " + sys.argv[0] + " bump_version") 00267 return 00268 previous_v = get_version() 00269 for ext in ["spec", "dsc"]: 00270 path = "./packaging/fuse_kafka.{}".format(ext) 00271 run("sed", "-i", "s/^\(Version: \).*/\\1{}/".format(v), path) 00272 run("sed", "-i", "s/\(#define VERSION \).*$/\\1\\\"{}\\\"/".format(v), "src/version.h") 00273 print "version bumped from {} to {} ".format(previous_v, v) 00274 def version(): 00275 """ Displays the current version number 00276 00277 Example 00278 00279 $ ./build.py version 00280 0.1.4 00281 """ 00282 print(get_version()) 00283 def package(): 00284 """ Generates a tar.gz corresponding to current directory in the parent directory, 00285 excluding .git, .nfs*, out directory 00286 00287 Example 00288 00289 ./build.py package 00290 tar --transform s,^.,fuse_kafka-0.1.4, --exclude=.git --exclude=.nfs* --exclude=out -czf ../fuse_kafka-0.1.4.tar.gz . 00291 """ 00292 clean() 00293 name = binary_name + "-" + get_version() 00294 tar = "../" + name + ".tar.gz" 00295 run("tar", "--transform", "s,^.," + name + ",", 00296 "--exclude=.nfs*", 00297 "--exclude=out", "-czf", tar , ".") 00298 def rpm(): 00299 name = binary_name + "-" + get_version() 00300 package() 00301 sources = os.environ["HOME"] + "/rpmbuild/SOURCES" 00302 run("mkdir", "-p", sources) 00303 tar = "../" + name + ".tar.gz" 00304 run("cp", tar, sources) 00305 run("rpmbuild", "-ba", "--define", "_srcdefattr (-,root,root)", 00306 "packaging/fuse_kafka.spec") 00307 def filter_link(a): 00308 """ Filter function for link flags: 00309 takes a link flag and modifies it if necessary, i.e. 00310 in the link is -lcrypto, returns a static library path instead 00311 00312 Examples: 00313 filter_link('-lblah') 00314 => '-lblah' 00315 filter_link('-lcrypto') 00316 => '/usr/lib/x86_64-linux-gnu/libcrypto.a' 00317 00318 Returns the new gcc option 00319 """ 00320 if a != "-lcrypto": return a 00321 result = [] 00322 for pattern in ["/usr/lib*/libcrypto.a", "/usr/lib*/*/libcrypto.a"]: 00323 result += glob.glob(pattern) 00324 if len(result) > 0: 00325 return result[0] 00326 else: 00327 return a 00328 def to_links(libs): 00329 """ Convert a library list to gcc link flags Prepends -l to a given list 00330 00331 Examples: 00332 00333 to_links(['curl', 'au']) 00334 => ['-lcurl', '-lau'] 00335 00336 Returns the new converted list 00337 """ 00338 #return [filter_link(a) for a in ['-l'+s for s in libs]] 00339 return [a for a in ['-l'+s for s in libs]] 00340 def binary_exists(name): 00341 """ Checks if a binary exists (requires 'which' utility) 00342 Returns true if the binary exists 00343 """ 00344 cmd = ["which",name] 00345 p = subprocess.Popen(cmd, stdout=subprocess.PIPE) 00346 res = p.stdout.readlines() 00347 if len(res) == 0: 00348 return False 00349 return True 00350 def test(): 00351 """ Compile, Run unit tests generating reports in out directory """ 00352 run('rm', '-rf', 'out/*') 00353 for d in ["c", "python"]: run('mkdir', '-p', 'out/' + d) 00354 compile_test() 00355 test_run() 00356 def build(): 00357 """ Builds fuse_kafka binary """ 00358 compile() 00359 link() 00360 def run_c_test(source): 00361 bin_path = get_test_bin(source) 00362 batch = "-batch" 00363 if os.getenv("NO_BATCH") != None: batch = "" 00364 if os.path.exists(bin_path): 00365 gdb = "gdb " + batch + " -return-child-result \ 00366 --eval-command=run --eval-command=where \ 00367 --eval-command='info locals' --args " 00368 if os.getenv("NO_GDB") != None: gdb = "" 00369 result = os.system(gdb + "./" + bin_path) 00370 if result != 0: 00371 print("failed running {}, RC: {}".format(source, result)) 00372 exit(result) 00373 else: 00374 print("warning: no binary test {}".format(bin_path)) 00375 def c_test(): 00376 """ Builds, run unit tests, generating coverage reports in out directory """ 00377 compile_test() 00378 for source in sources: run_c_test(source) 00379 for library_source in plugins.libraries_sources: 00380 run_c_test(plugins.test_of[library_source]) 00381 tests = sources + map( 00382 lambda x: plugins.test_of[library_source], 00383 plugins.libraries_sources) 00384 run("gcov", ["src/" + x + ".c" for x in tests] ,"-o", ".") 00385 if binary_exists("lcov"): 00386 run("lcov", "--no-external", "--rc", "lcov_branch_coverage=1", "-c", "-d", ".", "-o", "./src/coverage.info") 00387 if binary_exists("genhtml"): 00388 run("genhtml", "--rc", "lcov_branch_coverage=1", 00389 "./src/coverage.info", "-o", "./out/c") 00390 def python_test(): 00391 run("python-coverage", "run", "src/fuse_kafka_test.py") 00392 run("find", "out") 00393 try: 00394 run("python-coverage", "html", "-d", "out/python") 00395 except: 00396 print("error while generating html coverage report") 00397 def test_run(): 00398 c_test() 00399 python_test() 00400 def to_includes(what): 00401 return [os.popen("pkg-config --cflags " + a).read().split() for a in what] 00402 def target_matched(target): 00403 compiler = cc[0] 00404 if type(cc) is str: 00405 compiler = cc 00406 cmd = "sh -c '" + compiler + " -v 2>&1|grep Target:|grep \"" + target + "\"'" 00407 return len(os.popen(cmd).read().split()) > 0 00408 def compile_plugins(): 00409 for library_source in plugins.libraries_sources: 00410 if not target_matched(plugins.targets_of[library_source]): 00411 print("skipping " + library_source + " plugin because not compiling for target") 00412 continue 00413 run(cc, '-g', '-c', '-fpic', '-I', 'src', to_includes(plugins.includes_of[library_source]), "./src/plugins/" + plugins.kind_of[library_source] + "/" + library_source +'.c', flags, '-o', plugins.objects[library_source]) 00414 run(cc, '-shared', '-o', plugins.shareds_objects[library_source], plugins.objects[library_source], flags, to_links(plugins.libs_of[library_source])) 00415 def compile(): 00416 """ Compiles *.c files in source directory """ 00417 compile_plugins() 00418 for source in sources: 00419 run(cc, '-g', '-c', "./src/" + source+'.c', flags) 00420 def get_test_bin(source): 00421 return source.replace("/", "_") +'.test' 00422 def compile_test_with_libs(source, libs, includes = []): 00423 """ Builds unit test binary """ 00424 path = "./src/" + source +'.c' 00425 if not os.path.exists(path): 00426 print("warning: no test for {}".format(path)) 00427 else: 00428 run(cc, '-I', 'src', to_includes(includes), '-g', '-o', get_test_bin(source), path, flags, 00429 test_flags, to_links(libs)) 00430 def compile_test(): 00431 """ Builds unit test binary """ 00432 for source in sources: 00433 compile_test_with_libs(source, common_libs) 00434 for library_source in plugins.libraries_sources: 00435 compile_test_with_libs(plugins.test_of[library_source], 00436 plugins.libs_of[library_source], plugins.includes_of[library_source]) 00437 def link(): 00438 """ Finalize the binary generation by linking all object files """ 00439 objects = [s+'.o' for s in sources] 00440 run(cc, '-g', objects, '-o', binary_name, flags, to_links(libs)) 00441 def install(): 00442 """ installs fuse kafka on current system, i.e. installs: 00443 - fuse_kafka binary in $BUILDROOT/usr/bin 00444 - fuse_kafka init script in $BUILDROOT/etc/init.d 00445 - fuse_kafka configuration $BUILDROOT/etc 00446 00447 - BUILDROOT (environment variable): the target directory where to install fuse_kafka, 00448 if not specified, will be filesystem root ('/') 00449 """ 00450 root = '/' 00451 if os.environ.get('BUILDROOT') != None: 00452 root = os.environ.get('BUILDROOT') + "/" 00453 build() 00454 install_directory = root + 'usr/bin/' 00455 lib_directory = root + 'usr/lib/' 00456 init_directory = root + 'etc/init.d/' 00457 conf_directory = root + 'etc/' 00458 [run('mkdir', '-p', d) for d in 00459 [conf_directory, init_directory, install_directory, lib_directory]] 00460 for key in plugins.shareds_objects: 00461 if not target_matched(plugins.targets_of[key]): continue 00462 run('cp', plugins.shareds_objects[key], lib_directory) 00463 run('cp', binary_name, install_directory) 00464 [run('cp', 'src/' + init_name + '.py', init_directory + init_name) 00465 for init_name in ["fuse_kafka", "fuse_kafka_umounter"]] 00466 run('cp', 'conf/fuse_kafka.properties', 00467 conf_directory + "fuse_kafka.conf") 00468 def install_from_source(root, src, source): 00469 path = src + source['name'] 00470 print(">> in " + path) 00471 if not 'branch' in source: source['branch'] = "master" 00472 if not 'append' in source: source['append'] = "" 00473 if not 'ignore' in source: source['ignore'] = False 00474 if not os.path.exists(path): 00475 run("git", "clone", "--single-branch", "--branch", source['branch'], source['url'], cwd = src) 00476 if 'pre' in source: 00477 pre = source['pre'] 00478 run(*pre['action'], cwd = src + pre['cwd']) 00479 wd = os.getcwd() 00480 os.chdir(path) 00481 if not os.path.exists("configure"): os.system("autoreconf -if") 00482 if os.system("./configure --prefix=" + root + " " + source['append']) != 0 or\ 00483 (os.system("make") != 0 and not source['ignore']) or\ 00484 os.system("make install") != 0: 00485 raise Exception("build failed for " + path) 00486 os.chdir(wd) 00487 def install_dependencies(): 00488 if os.environ.get('BUILDROOT') == None: 00489 print("no BUILDROOT specified") 00490 return 00491 if os.environ.get('SRCROOT') == None: 00492 os.environ['SRCROOT'] = "/tmp/fuse_kafka_src" 00493 return 00494 root = os.environ.get('BUILDROOT') + "/" 00495 src = os.environ.get('SRCROOT') + "/" 00496 for d in [root, src]: run("mkdir", "-p", d) 00497 for source in [ 00498 { 00499 'name' : 'zookeeper/src/c', 00500 'pre' : {'cwd': 'zookeeper', 'action': ["ant", "compile_jute"]}, 00501 'url' : 'https://github.com/fuse-kafka/zookeeper.git', 00502 'branch': 'mingw', 00503 'append': '--host=mingw32', 00504 }, 00505 { 00506 'name' : 'zlib', 00507 'url' : 'https://github.com/fuse-kafka/zlib.git', 00508 }, 00509 { 00510 'name' : 'jansson', 00511 'url' : 'https://github.com/akheron/jansson', 00512 'append': '--host=mingw32', 00513 }, 00514 { 00515 'name' : 'dlfcn-win32', 00516 'url' : 'https://github.com/dlfcn-win32/dlfcn-win32', 00517 'append': '--cc=' + cc[0] 00518 }, 00519 { 00520 'name' : 'librdkafka', 00521 'url' : 'https://github.com/yazgoo/librdkafka', 00522 'branch': 'win32', 00523 'ignore': True, 00524 }, 00525 ]: install_from_source(root, src, source) 00526 if target_matched("mingw"): 00527 for header in ["winconfig.h", "winstdint.h"]: 00528 run("cp", src + "/zookeeper/src/c/include/" + header, root + "/include/") 00529 def binary_archive(): 00530 if os.environ.get('BUILDROOT') == None: 00531 os.environ['BUILDROOT'] = "/tmp/" + binary_name + "_install" 00532 install_dependencies() 00533 install() 00534 root = (os.environ.get('BUILDROOT') + "/").replace("//", "/") 00535 main_bin = root + "/usr/bin/" + binary_name 00536 if target_matched("mingw"): 00537 run("cp", main_bin, main_bin + ".exe") 00538 for dll in ["/usr/x86_64-w64-mingw32/lib/libwinpthread-1.dll", 00539 "/usr/lib/gcc/x86_64-w64-mingw32/4.8/libgcc_s_sjlj-1.dll"]: 00540 run("cp", dll, root + "/usr/lib/") 00541 name = binary_name + "-" + get_version() 00542 package_name = "../" + name + "-bin" 00543 tar = package_name + ".tar.gz" 00544 if target_matched("mingw"): 00545 _zip = "IronPython-2.7.5.zip" 00546 run("wget", "http://download-codeplex.sec.s-msft.com/Download/Release?ProjectName=ironpython&DownloadId=970326&FileTime=130623736032830000&Build=21028", "-O", _zip) 00547 run("unzip", "-o", _zip, "-d", root) 00548 run("tar", "czf", tar, root, "-C", root) 00549 run("zip", "-r", os.getcwd() + "/" + package_name + ".zip", os.path.basename(os.path.dirname(root)), cwd = root + "/..") 00550 def download_and_untargz(archive, directory_url, directory): 00551 if not os.path.exists(archive): 00552 os.system('wget ' + directory_url + "/" + archive) 00553 if not os.path.exists(directory): 00554 os.system('tar xzf ' + archive) 00555 def logstash_download(): 00556 download_and_untargz(logstash_archive, logstash_server, logstash_directory) 00557 def tail(): 00558 kafka_download() 00559 logstash_download() 00560 os.chdir("src") 00561 os.system('../' + logstash_directory + '/bin/logstash agent -f ../conf/logstash.conf') 00562 def clean(): 00563 """ Cleanups file generated by this script """ 00564 autoclean() 00565 def kafka_download(): 00566 """ Downloads kafka binary distribution archive and uncompresses it """ 00567 download_and_untargz(kafka_archive, kafka_server + kafka_version, kafka_directory) 00568 def zookeeper_start(): 00569 """ Does kafka_dowload() and starts zookeeper server """ 00570 kafka_download() 00571 run(kafka_bin_directory + 'zookeeper-server-start.sh', 00572 kafka_config_directory + 'zookeeper.properties') 00573 def bench(): 00574 Benchmarks().generate() 00575 def kafka_start(): 00576 """ Does kafka_dowload() and starts kafka server """ 00577 kafka_download() 00578 run(kafka_bin_directory + 'kafka-server-start.sh', 00579 kafka_config_directory + 'server.properties') 00580 def kafka_consumer_start(): 00581 """ Starts a kafka logstash/fuse_kafka events consumer 00582 (pretty printing events) 00583 00584 - zkconnect (environment variable): if specified, launches the kafka 00585 consumer based on this zookeeper cluster address, otherwise looks for a 00586 zookeeper on localhost 00587 """ 00588 zkconnect = os.environ.get('zkconnect') 00589 if zkconnect == None: zkconnect = "localhost" 00590 FuseKafkaLog(zkconnect).start() 00591 def create_topic_command(zkconnect): 00592 """ 00593 Return a command line for creating a new logging topic on kafka cluster 00594 00595 - zkconnect: the zookeeper cluster endpoint to kafka 00596 """ 00597 return kafka_bin_directory + 'kafka-topics.sh --create --topic logs --zookeeper {} --partitions 2 --replication-factor 1'.format(zkconnect) 00598 def wait_for_input(): 00599 try: 00600 raw_input(">") 00601 except: 00602 print("done") 00603 fuse_kafka_input_line = "fuse_kafka_input=[\"{0}\"]" 00604 overlay_line = fuse_kafka_input_line.format("overlay") 00605 inotify_line = fuse_kafka_input_line.format("inotify") 00606 def ruby_write_tests(): 00607 print(">> launching overlay write") 00608 cmd = "ruby write_tests.rb /tmp/write_tests." 00609 os.system('cat ./conf/fuse_kafka.properties') 00610 os.system(cmd + "overlay") 00611 comment_conf(overlay_line) 00612 uncomment_conf(inotify_line) 00613 os.system('./src/fuse_kafka.py restart') 00614 print(">> launching inotify write") 00615 os.system('cat ./conf/fuse_kafka.properties') 00616 os.system(cmd + "inotify") 00617 def comment_conf(what, first = "", second = "#"): 00618 os.system("sed -i 's/^{}{}$/{}{}/' conf/fuse_kafka.properties".format( 00619 first, what.replace("[", ".").replace("]", "."), second, what)) 00620 def uncomment_conf(what): 00621 comment_conf(what, first = "#", second = "") 00622 def write_tests(): 00623 """ Launches kafka, zookeeper, fuse_kafka """ 00624 uncomment_conf(overlay_line) 00625 comment_conf(inotify_line) 00626 quickstart(consumer = False, synchronous_action = "ruby_write_tests") 00627 def quickstart(consumer = True, synchronous_action = "wait_for_input"): 00628 """ Launches kafka, zookeeper, fuse_kafka and a console consumer locally """ 00629 kafka_download() 00630 klog = '/tmp/kafka.log' 00631 zlog = '/tmp/zookeeper.log' 00632 if os.path.exists(klog): shutil.rmtree(klog) 00633 if os.path.exists(zlog): shutil.rmtree(zlog) 00634 p1 = multiprocessing.Process(target=zookeeper_start, args=()) 00635 p2 = multiprocessing.Process(target=kafka_start, args=()) 00636 p3 = None 00637 if consumer: p3 = multiprocessing.Process(target=kafka_consumer_start, args=()) 00638 p1.start() 00639 p2.start() 00640 result = 1 00641 while result != 0: 00642 result = os.system(create_topic_command('localhost')) 00643 time.sleep(0.2) 00644 os.system('./src/fuse_kafka.py start') 00645 if consumer: p3.start() 00646 try: 00647 f = globals()[synchronous_action] 00648 if f == None: 00649 f = locals()[synchronous_action] 00650 if f != None: 00651 f() 00652 else: 00653 f() 00654 except e: 00655 print(e) 00656 p1.terminate() 00657 p2.terminate() 00658 if consumer: 00659 p3.terminate() 00660 os.system('pkill -9 -f java.*kafka.consumer.ConsoleConsumer') 00661 os.system('./src/fuse_kafka.py stop') 00662 os.system(kafka_bin_directory + 'kafka-server-stop.sh') 00663 os.system(kafka_bin_directory + 'zookeeper-server-stop.sh') 00664 def doc(): 00665 """ generates the project documentation """ 00666 run('mkdir', '-p', 'doc') 00667 run("doxygen", "Doxyfile") 00668 class TestMininet(unittest.TestCase): 00669 """ Utility to create a virtual network to test fuse kafka resiliancy """ 00670 def impersonate(self, inital_user = True): 00671 """ changes effective group and user ids """ 00672 uid = gid = None 00673 if inital_user: 00674 uid = os.getuid() 00675 gid = os.getuid() 00676 else: 00677 stat = os.stat(".") 00678 uid = stat.st_uid 00679 gid = stat.st_gid 00680 os.setegid(gid) 00681 os.seteuid(uid) 00682 def start_network(self): 00683 """ starts-up a single switch topology """ 00684 from mininet.topo import Topo 00685 from mininet.net import Mininet 00686 from mininet.node import OVSController 00687 class SingleSwitchTopo(Topo): 00688 "Single Switch Topology" 00689 def __init__(self, count=1, **params): 00690 Topo.__init__(self, **params) 00691 hosts = [ self.addHost('h%d' % i) for i in range(1, count + 1) ] 00692 s1 = self.addSwitch('s1') 00693 for h in hosts: 00694 self.addLink(h, s1) 00695 self.net = Mininet(topo = SingleSwitchTopo(4), controller = OVSController) 00696 self.net.start() 00697 self.impersonate(False) 00698 def log_path(self, name): 00699 return "/tmp/{}.log".format(name) 00700 def shell(self): 00701 """ launches mininet CLI """ 00702 from mininet.cli import CLI 00703 CLI(self.net) 00704 def clients_initialize(self): 00705 """ initializes clients variables based on hosts """ 00706 self.kafka = self.net.get('h1') 00707 self.zookeeper = self.net.get('h2') 00708 self.fuse_kafka = self.net.get('h3') 00709 self.client = self.net.get('h4') 00710 self.hosts = [self.kafka, self.zookeeper, self.fuse_kafka, self.client] 00711 self.switch = self.net.get('s1') 00712 self.java_clients = [self.client, self.kafka, self.zookeeper] 00713 def cmd(self, where, cmd): 00714 import pwd 00715 command = "su {} -c '{}'".format( 00716 pwd.getpwuid(os.stat(".").st_uid).pw_name, cmd) 00717 print(command) 00718 return where.cmd(command) 00719 def data_directories_cleanup(self): 00720 """ cleanups generated directory """ 00721 self.cmd(self.zookeeper, "rm -rf /tmp/kafka-logs /tmp/zookeeper") 00722 def zookeeper_start(self): 00723 """ starts zookeeper server """ 00724 self.cmd(self.zookeeper, self.launch.format("zookeeper") 00725 + kafka_config_directory 00726 + 'zookeeper.properties >> {} 2>&1 &'.format(self.log_path('zookeeper'))) 00727 def kafka_start(self): 00728 """ starts kafka server and creates logging topic """ 00729 import tempfile 00730 if not hasattr(self, 'kafka_config'): 00731 self.kafka_config = tempfile.NamedTemporaryFile(delete=False) 00732 self.kafka_config.write("zookeeper.connect={}\n".format(self.zookeeper.IP())) 00733 self.kafka_config.write("broker.id=0\n") 00734 self.kafka_config.write("host.name={}\n".format(self.kafka.IP())) 00735 self.kafka_config.close() 00736 self.cmd(self.kafka, self.launch.format("kafka") 00737 + self.kafka_config.name + ' > {} 2>&1 &'.format(self.log_path('kafka'))) 00738 self.cmd(self.kafka, create_topic_command( 00739 self.zookeeper.IP()) + " > {} 2>&1 ".format(self.log_path('create_topic'))) 00740 def kafka_stop(self): 00741 """ stops kafka server """ 00742 self.cmd(self.kafka, self.stop.format("kafka")) 00743 def zookeeper_stop(self): 00744 """ stops zookeeper server """ 00745 self.cmd(self.zookeeper, "pkill -9 -f zookeeper.properties") 00746 def fuse_kafka_start(self): 00747 """ starts fuse_kafka """ 00748 cwd = os.getcwd() + "/" 00749 self.fuse_kafka_path = '{}/fuse_kafka'.format(cwd) 00750 conf = "/tmp/conf" 00751 self.cmd(self.fuse_kafka, "mkdir -p {}".format(conf)) 00752 self.cmd(self.fuse_kafka, "cp {}conf/fuse_kafka.properties {}".format(cwd, conf)) 00753 self.cmd(self.fuse_kafka, "sed -i 's/127.0.0.1/{}/' {}/fuse_kafka.properties" 00754 .format(self.zookeeper.IP(), conf)) 00755 self.cmd(self.fuse_kafka, "ln -s {}/fuse_kafka {}/../fuse_kafka" 00756 .format(cwd, conf)) 00757 for path in glob.glob(cwd + "/*.so"): 00758 self.cmd(self.fuse_kafka, "ln -s {} {}/../{}" 00759 .format(path, conf, path.split("/")[-1])) 00760 self.cmd(self.fuse_kafka, "ln -s {}/fuse_kafka {}/../fuse_kafka" 00761 .format(cwd, conf)) 00762 self.cmd(self.fuse_kafka, 'bash -c "cd {}/..;{}src/fuse_kafka.py start > {} 2>&1"' 00763 .format(conf, cwd, self.log_path('fuse_kafka'))) 00764 def consumer_start(self): 00765 """ starts fuse_kafka consumer """ 00766 if os.path.exists(self.log_path('consumer')): 00767 os.remove(self.log_path('consumer')) 00768 command = os.getcwd() + "/" + kafka_bin_directory 00769 command += "kafka-console-consumer.sh --zookeeper " 00770 command += self.zookeeper.IP() + " --topic logs" 00771 print(command) 00772 self.impersonate() # popen require setns() 00773 self.consumer = self.client.popen(command) 00774 self.impersonate(False) 00775 def tearDown(self): 00776 """ stops fuse_kafka, zookeeper, kafka, cleans their working directory and 00777 stops the virtual topology """ 00778 for host in self.java_clients: self.cmd(host, 'pkill -9 java') 00779 self.consumer.kill() 00780 self.cmd(self.fuse_kafka, 'src/fuse_kafka.py stop') 00781 os.remove(self.kafka_config.name) 00782 self.data_directories_cleanup() 00783 self.impersonate() 00784 self.net.stop() 00785 def setUp(self): 00786 """ starts the topology, downloads kafka, does a data directory 00787 cleanup in case of previous run """ 00788 self.launch = kafka_bin_directory + '{}-server-start.sh ' 00789 self.stop = kafka_bin_directory + '{}-server-stop.sh ' 00790 self.start_network() 00791 kafka_download() 00792 self.clients_initialize() 00793 self.data_directories_cleanup() 00794 self.components_start() 00795 # wait for fuse-kafka to be ready 00796 time.sleep(2) 00797 def check(self): 00798 self.assertTrue(os.path.exists(self.fuse_kafka_path), 00799 "you must build fuse kafka to run tests") 00800 os.stat("/tmp/fuse-kafka-test") 00801 def get_consumed_events(self, expected_number): 00802 from mininet.util import pmonitor 00803 events = [] 00804 log = FuseKafkaLog() 00805 popens = {} 00806 popens[self.client] = self.consumer 00807 for host, line in pmonitor(popens): 00808 self.consumer.poll() 00809 events.append(log.load_fuse_kafka_event(line)) 00810 if len(events) >= expected_number: 00811 break 00812 self.assertEqual(expected_number, len(events)) 00813 return events 00814 def write_to_log(self, what = "test"): 00815 self.cmd(self.fuse_kafka, "echo -n {} > /tmp/fuse-kafka-test/xd 2>&1".format(what)) 00816 def components_start(self): 00817 """ starts zookeepre, kafka, fuse_kafka, fuse_kafka consumer """ 00818 self.zookeeper_start() 00819 self.kafka_start() 00820 self.fuse_kafka_start() 00821 self.consumer_start() 00822 def test_basic(self): 00823 """ runs the topology with a mininet shell """ 00824 self.check() 00825 for message in ["hello", "world"]: 00826 self.write_to_log(message) 00827 events = self.get_consumed_events(1) 00828 self.assertEqual(message, events[0]["@message"]) 00829 expected = ["foo", "bar"] 00830 for message in expected: 00831 self.write_to_log(message) 00832 actual = [event["@message"] for event in self.get_consumed_events(2)] 00833 self.assertEqual(sorted(expected), sorted(actual)) 00834 def test_shutting_down_kafka(self): 00835 self.check() 00836 self.kafka_stop() 00837 self.write_to_log() 00838 self.kafka_start() 00839 self.get_consumed_events(1) 00840 def test_shutting_down_zookeeper(self): 00841 self.check() 00842 self.zookeeper_stop() 00843 self.write_to_log() 00844 self.zookeeper_start() 00845 self.get_consumed_events(1) 00846 def test_bringing_down_kafka(self): 00847 self.check() 00848 self.kafka_stop() 00849 self.write_to_log() 00850 self.kafka_start() 00851 self.get_consumed_events(1) 00852 def test_cutting_kafka(self): 00853 self.check() 00854 self.write_to_log() 00855 self.net.configLinkStatus(self.kafka.name, self.switch.name, "down") 00856 self.assertRaises(ValueError, self.get_consumed_events, (1)) 00857 self.net.configLinkStatus(self.kafka.name, self.switch.name, "up") 00858 self.get_consumed_events(1) 00859 def test_cutting_zookeeper(self): 00860 self.check() 00861 self.write_to_log() 00862 self.net.configLinkStatus(self.zookeeper.name, self.switch.name, "down") 00863 # zookeeper being brought down should not influence an already launched producer 00864 self.get_consumed_events(1) 00865 def test_cutting_kafka_periodically(self): 00866 self.check() 00867 ranges = {10: range(3), 1: range(4), 0: range(10)} 00868 for sleep_time in ranges: 00869 print("sleep time: " + str(sleep_time)) 00870 for i in ranges[sleep_time]: 00871 print("loop # " + str(i)) 00872 self.net.configLinkStatus(self.kafka.name, self.switch.name, "down") 00873 time.sleep(sleep_time) 00874 self.assertRaises(ValueError, self.get_consumed_events, (1)) 00875 self.net.configLinkStatus(self.kafka.name, self.switch.name, "up") 00876 if sleep_time > 1: 00877 time.sleep(7) # wait for kafka to be restarted 00878 self.write_to_log() 00879 self.get_consumed_events(1) 00880 if __name__ == "__main__": 00881 if len(sys.argv) <= 1 or not (sys.argv[1] in ["quickstart", "mininet", "bench", "multiple"]): 00882 main() 00883 else: 00884 if sys.argv[1] == "multiple": 00885 for arg in sys.argv[2:]: 00886 locals()[arg]() 00887 elif sys.argv[1] == "quickstart": quickstart() 00888 elif sys.argv[1] == "bench": bench() 00889 else: 00890 sys.argv.pop(0) 00891 unittest.main()