fuse_kafka
build.py
00001 #!/usr/bin/env python
00002 """ Builds unit test binary """
00003 try:
00004     import base64, subprocess, sys, glob, os, json, thread
00005     import multiprocessing, shutil, time, unittest
00006 except ImportError, e:
00007     print "failed importing module", e
00008 from fabricate import *
00009 def get_define(source, name):
00010     """ Returns the current define for fuse_kafka based on src/source """
00011     f = open("src/" + source + ".h")
00012     result = []
00013     while True:
00014         result = f.readline().split()
00015         if len(result) == 3 and result[0] == "#define" and result[1] == name:
00016             break;
00017     result = result[-1][1:-1]
00018     f.close()
00019     return result
00020 class Plugins:
00021     def get_macro_definition(self, name, path):
00022         cmd = "sh -c \"grep --color=no '^"+name+"(' " + path + ".c | sed 's/^"+name+"(\(.*\))$/\\1/'\""
00023         return os.popen(cmd).read().rstrip()
00024     def get_target(self, path):
00025         target_str = self.get_macro_definition("target", path)
00026         if target_str == '': return '.*'
00027         return target_str
00028     def get_requirements(self, path):
00029         required_str = self.get_macro_definition("requires", path)
00030         required = []
00031         if required_str != '': required = required_str.split("\n")
00032         return required
00033     def __init__(self, cc):
00034         self.cc = cc
00035         _dir = "src/plugins/input/"
00036         self.tests_sources = [os.path.splitext(os.path.basename(a))[0] for a in glob.glob(_dir + "*_test.c")]
00037         self.tests_paths = [_dir.replace("src/", "") + x for x in self.tests_sources]
00038         self.libraries_sources = [os.path.splitext(os.path.basename(a))[0] for a in glob.glob(_dir + "*.c")]
00039         self.libraries_sources = [x for x in self.libraries_sources if x not in self.tests_sources]
00040         self.libs_of = {}
00041         self.targets_of = {}
00042         self.includes_of = {}
00043         self.kind_of = {}
00044         self.shareds_objects = {}
00045         self.objects = {}
00046         self.test_of = {}
00047         for kind in ['input', 'output']:
00048             self.fill_up(kind)
00049     def fill_up(self, kind):
00050         _dir = "src/plugins/" + kind + "/"
00051         self.tests_sources += [os.path.splitext(os.path.basename(a))[0] for a in glob.glob(_dir + "*_test.c")]
00052         self.tests_paths += [_dir.replace("src/", "") + x for x in self.tests_sources]
00053         current_sources = [os.path.splitext(os.path.basename(a))[0] for a in glob.glob(_dir + "*.c")]
00054         current_sources = [x for x in current_sources if x not in self.tests_sources]
00055         prefix = get_define("version", kind.upper() + "_PLUGIN_PREFIX")
00056         for lib in current_sources:
00057             self.kind_of[lib] = kind
00058             required = self.get_requirements(_dir + lib)
00059             self.targets_of[lib] = self.get_target(_dir + lib)
00060             self.libs_of[lib] = required + default_libs
00061             self.includes_of[lib] = required
00062             self.test_of[lib] = ((_dir + lib) +  "_test").replace("src/", "")
00063             self.shareds_objects[lib] = prefix + lib + ".so"
00064             self.objects[lib] = prefix + lib + ".o"
00065         self.libraries_sources +=  current_sources
00066 cc = 'gcc'
00067 if "CC" in os.environ:
00068     cc = os.environ["CC"]
00069 _flags = []
00070 for flag in ["CFLAGS", "LDFLAGS"]:
00071     if flag in os.environ:
00072         _flags = _flags + os.environ[flag].split()
00073 zookeeper_type = "zookeeper_st"
00074 if "zookeeper_st" in os.environ:
00075     zookeeper_type = "zookeeper_st"
00076 cc = [cc, _flags]
00077 sources = ['fuse_kafka']
00078 binary_name = sources[0]
00079 common_libs = ["m", "dl", "pthread", "jansson"]#, "ulockmgr"]
00080 libs = [zookeeper_type, "rdkafka",  "z"] + common_libs
00081 default_libs = ["m",  zookeeper_type, "rdkafka", "jansson", "dl"]
00082 if "LIBS" in os.environ:
00083     additional_libs = [a.replace("-l", "") for a in os.environ["LIBS"].split()]
00084     default_libs += additional_libs
00085     libs += additional_libs
00086 plugins = Plugins(cc)
00087 flags = ['-D_FILE_OFFSET_BITS=64', '-DCOMMIT="' + os.popen("git log --format=%H -n 1").read().rstrip() + '"']
00088 if "CFLAGS" in os.environ:
00089     flags = os.environ["CFLAGS"].split() + flags
00090 test_flags = ['-fprofile-arcs', '-ftest-coverage', '-DTEST="out"']
00091 logstash_directory = "logstash-1.5.2" 
00092 logstash_archive = logstash_directory + ".tar.gz"
00093 logstash_server = "https://download.elastic.co/logstash/logstash"
00094 kafka_server = "http://mir2.ovh.net/ftp.apache.org/dist/kafka/"
00095 kafka_version = "0.8.1.1"
00096 scala_version = "2.8.0"
00097 kafka_directory = "kafka_" + scala_version + "-" + kafka_version
00098 kafka_archive = kafka_directory + ".tgz"
00099 kafka_bin_directory = kafka_directory + "/bin/"
00100 kafka_config_directory = kafka_directory + "/config/"
00101 class Benchmark:
00102     def run_low_level(self):
00103         return os.popen("bonnie++ -q -n 128 -d /tmp/fuse-kafka-test").read()
00104     def run(self):
00105         s = self.run_low_level().split(",")
00106         create_subsections = ["create", "read", "delete"]
00107         output_subsections = ["per char", "block", "rewrite"]
00108         input_subsections = ["sequential per char",
00109                 "sequential block", "random seeks"]
00110         res = {
00111                 "version": s[0],
00112                 "hostname": s[2],
00113                 "timestamp": int(s[4]),
00114                 "files": int(s[19]),
00115                 "sequential create": self.parse_action(
00116                     s, 24, 0, create_subsections),
00117                 "random create": self.parse_action(s, 24, 6,
00118                     create_subsections),
00119                 "sequential output": self.parse_action(s, 7, 0,
00120                     output_subsections),
00121                 "input": self.parse_action(s, 7, 6,
00122                     input_subsections),
00123                 }
00124         return res
00125     def int(self, i):
00126         if i[0] == '+': return 0;
00127         return float(i)
00128     def p(self, s, _, d, subsections, i, n):
00129         return {
00130                 "per second": self.int(s[_ + i + d]),
00131                 "cpu percent": self.int(s[_ + i + 1 + d]),
00132                 "latency": s[_ + n + d/2]
00133                 }
00134     def parse_action(self, s, _, d, subsections):
00135         return {
00136                 subsections[0]: self.p(s, _, d, subsections, 0, 18),
00137                 subsections[1]: self.p(s, _, d, subsections, 2, 19),
00138                 subsections[2]: self.p(s, _, d, subsections, 4, 20)
00139                 }
00140 class Benchmarks:
00141     def generate(self):
00142         import json
00143         result = {}
00144         os.system('./src/fuse_kafka.py start')
00145         result["with fuse kafka"] = Benchmark().run()
00146         os.system('./src/fuse_kafka.py stop')
00147         result["without fuse kafka"] = Benchmark().run()
00148         f = open("benchs/results.js", "w")
00149         f.write("function get_results() { \nreturn "\
00150                 + json.dumps(result, sort_keys=True,
00151                     indent=4, separators=(',', ': ')) + "; \n}\n")
00152         f.close()
00153 class FuseKafkaLog:
00154     """ Utility to read messages from kafka based on fuse kafka format """
00155     def __init__(self, zkconnect = "localhost"):
00156         self.select = None
00157         self.zkconnect = zkconnect
00158     def run_command(self, *command):
00159         """ Run an interactive command line
00160 
00161         command - the command to run as an argument array
00162 
00163         Returns an iterator to the command stdout
00164         """
00165         p = subprocess.Popen(command,
00166                 stdout=subprocess.PIPE,
00167                 stderr=subprocess.STDOUT)
00168         return iter(p.stdout.readline, '')
00169     def pretty_print(self, string):
00170         """ Displays a json logstash/fuse_kafka event in a user friendly fashion
00171         
00172         string - the JSON input logstash/fuse_kafka event
00173 
00174         Example
00175 
00176             pretty_print('{"command": "bXkgY29tbWFuZA==", "@message": "bXkgbWVzc2FnZQ==", '
00177                 + '"fields": {"a": "v"}, "tags": ["tag"]}')
00178 
00179             prints:
00180 
00181                 event:
00182                   message_size-added: 0
00183                   fields:
00184                      a: v
00185                   command: my command
00186                   @message: my message
00187                   tags:
00188                     -  tag
00189         """
00190         struct = self.load_fuse_kafka_event(string)
00191         print "event:"
00192         for key in struct:
00193             if self.select != None and not key in self.select:
00194                 continue
00195             sys.stdout.write("  " + key + ": ")
00196             value = struct[key]
00197             if type(value) is dict:
00198                 print
00199                 for name in value:
00200                     print "    ", name + ':', value[name]
00201             elif type(value) is list:
00202                 print
00203                 for v in value:
00204                     print "    - ", v
00205             else:
00206                 print value
00207     def load_fuse_kafka_event(self, string):
00208         """ Decodes a json logstash/fuse_kafka event string, i.e.:
00209                 - does a json decoding
00210                 - decodes @message and command fields 
00211                 - adds message_size-added field 
00212         
00213         string - the JSON input logstash/fuse_kafka event
00214 
00215         Example
00216             
00217             build.FuseKafkaLog().load_fuse_kafka_event(
00218               '{"command": "bXkgY29tbWFuZGU=", "@message": "bXkgbWVzc2FnZQ=="}')
00219 
00220             => {'message_size-added': 10,
00221                 u'command': 'my commande',
00222                 u'@message': 'my message'}
00223 
00224         Returns the decoded json object
00225         """
00226         event = json.loads(string)
00227         for item in ["@message", "command"]:
00228             event[item] += "=" * ((4 - len(event[item]) % 4) % 4)
00229             event[item] = base64.b64decode(event[item] + "==")
00230         event["message_size-added"] = len(event["@message"])
00231         return event
00232     def start(self):
00233         """ Launches a kafka console consumer and pretty prints 
00234             fuse_kafka/logstash events from this consumer
00235 
00236         - SELECT (environment variable): if defined, lists what field 
00237             names should be retrieved (whitespace separated)
00238         """
00239         if os.environ.get('SELECT') != None:
00240             self.select = os.environ.get('SELECT').split()
00241         for line in self.run_command(os.getcwd() + "/"
00242                 + kafka_bin_directory + 'kafka-console-consumer.sh',
00243             "--zookeeper", self.zkconnect, "--topic", "logs"):
00244             try:
00245                 self.pretty_print(line)
00246             except ValueError:
00247                 print line
00248 def get_version():
00249     """ Returns the current version for fuse_kafka based on src/version.h """
00250     return get_define("version", "VERSION")
00251 def bump_version():
00252     """ Changes the version number if v variable if specified:
00253             - The version number is changed in src/version.h
00254             - New packaging files are created with their version bumped
00255         displays a Usage message otherwise
00256 
00257     Example
00258 
00259         $ ./build.py bump_version
00260         Usage: $ v=0.1.4 ./build.py bump_version
00261 
00262     - v (environment variable): the new version
00263     """
00264     v = os.environ.get('v')
00265     if v == None:
00266         print("Usage: $ v=" + get_version() + " " + sys.argv[0] + " bump_version")
00267         return
00268     previous_v = get_version()
00269     for ext in ["spec", "dsc"]:
00270         path = "./packaging/fuse_kafka.{}".format(ext)
00271         run("sed", "-i", "s/^\(Version: \).*/\\1{}/".format(v), path)
00272     run("sed", "-i", "s/\(#define VERSION \).*$/\\1\\\"{}\\\"/".format(v), "src/version.h")
00273     print "version bumped from {} to {} ".format(previous_v, v)
00274 def version():
00275     """ Displays the current version number
00276 
00277     Example
00278 
00279         $ ./build.py version
00280         0.1.4
00281     """
00282     print(get_version())
00283 def package():
00284     """ Generates a tar.gz corresponding to current directory in the parent directory,
00285     excluding .git, .nfs*, out directory
00286 
00287     Example
00288 
00289         ./build.py package
00290         tar --transform s,^.,fuse_kafka-0.1.4, --exclude=.git --exclude=.nfs* --exclude=out -czf ../fuse_kafka-0.1.4.tar.gz .
00291     """
00292     clean()
00293     name = binary_name + "-" + get_version()
00294     tar = "../" + name + ".tar.gz"
00295     run("tar", "--transform", "s,^.," + name + ",",
00296             "--exclude=.nfs*",
00297             "--exclude=out", "-czf", tar , ".")
00298 def rpm():
00299     name = binary_name + "-" + get_version()
00300     package()
00301     sources = os.environ["HOME"] + "/rpmbuild/SOURCES"
00302     run("mkdir", "-p", sources)
00303     tar = "../" + name + ".tar.gz"
00304     run("cp", tar, sources)
00305     run("rpmbuild", "-ba", "--define", "_srcdefattr (-,root,root)",
00306             "packaging/fuse_kafka.spec")
00307 def filter_link(a):
00308     """ Filter function for link flags:
00309     takes a link flag and modifies it if necessary, i.e.
00310     in the link is -lcrypto, returns a static library path instead
00311 
00312     Examples:
00313         filter_link('-lblah')
00314         => '-lblah'
00315         filter_link('-lcrypto')
00316         => '/usr/lib/x86_64-linux-gnu/libcrypto.a'
00317 
00318     Returns the new gcc option
00319     """
00320     if a != "-lcrypto": return a
00321     result = []
00322     for pattern in ["/usr/lib*/libcrypto.a", "/usr/lib*/*/libcrypto.a"]:
00323         result += glob.glob(pattern)
00324     if len(result) > 0:
00325         return result[0]
00326     else:
00327         return a
00328 def to_links(libs):
00329     """ Convert a library list to gcc link flags Prepends -l to a given list 
00330 
00331     Examples:
00332 
00333         to_links(['curl', 'au'])
00334         => ['-lcurl', '-lau']
00335 
00336     Returns the new converted list
00337     """
00338     #return [filter_link(a) for a in ['-l'+s for s in libs]]
00339     return [a for a in ['-l'+s for s in libs]]
00340 def binary_exists(name):
00341     """ Checks if a binary exists (requires 'which' utility)
00342     Returns true if the binary exists
00343     """
00344     cmd = ["which",name]
00345     p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
00346     res = p.stdout.readlines()
00347     if len(res) == 0:
00348         return False
00349     return True
00350 def test():
00351     """ Compile, Run unit tests generating reports in out directory """
00352     run('rm', '-rf', 'out/*')
00353     for d in ["c", "python"]: run('mkdir', '-p', 'out/' + d)
00354     compile_test()
00355     test_run()
00356 def build():
00357     """ Builds fuse_kafka binary """
00358     compile()
00359     link()
00360 def run_c_test(source):
00361     bin_path = get_test_bin(source)
00362     batch = "-batch"
00363     if os.getenv("NO_BATCH") != None: batch = ""
00364     if os.path.exists(bin_path):
00365         gdb = "gdb " + batch + " -return-child-result \
00366                 --eval-command=run --eval-command=where \
00367                 --eval-command='info locals' --args "
00368         if os.getenv("NO_GDB") != None: gdb = ""
00369         result = os.system(gdb + "./" + bin_path)
00370         if result != 0:
00371             print("failed running {}, RC: {}".format(source, result))
00372             exit(result)
00373     else:
00374         print("warning: no binary test {}".format(bin_path))
00375 def c_test():
00376     """ Builds, run unit tests, generating coverage reports in out directory """
00377     compile_test()
00378     for source in sources: run_c_test(source)
00379     for library_source in plugins.libraries_sources:
00380         run_c_test(plugins.test_of[library_source])
00381     tests = sources + map(
00382             lambda x: plugins.test_of[library_source],
00383             plugins.libraries_sources)
00384     run("gcov", ["src/" + x + ".c" for x in tests] ,"-o", ".")
00385     if binary_exists("lcov"):
00386         run("lcov", "--no-external", "--rc", "lcov_branch_coverage=1", "-c", "-d", ".", "-o", "./src/coverage.info")
00387         if binary_exists("genhtml"):
00388             run("genhtml", "--rc", "lcov_branch_coverage=1", 
00389                     "./src/coverage.info", "-o", "./out/c")
00390 def python_test():
00391     run("python-coverage", "run", "src/fuse_kafka_test.py")
00392     run("find", "out")
00393     try:
00394         run("python-coverage", "html", "-d", "out/python")
00395     except:
00396         print("error while generating html coverage report")
00397 def test_run():
00398     c_test()
00399     python_test()
00400 def to_includes(what):
00401     return [os.popen("pkg-config --cflags " + a).read().split() for a in what]
00402 def target_matched(target):
00403     compiler = cc[0]
00404     if type(cc) is str:
00405         compiler = cc
00406     cmd = "sh -c '" + compiler + " -v 2>&1|grep Target:|grep \"" + target + "\"'"
00407     return len(os.popen(cmd).read().split()) > 0
00408 def compile_plugins():
00409     for library_source in plugins.libraries_sources:
00410         if not target_matched(plugins.targets_of[library_source]):
00411             print("skipping " + library_source + " plugin because not compiling for target")
00412             continue
00413         run(cc, '-g', '-c', '-fpic', '-I', 'src', to_includes(plugins.includes_of[library_source]), "./src/plugins/" + plugins.kind_of[library_source] + "/" + library_source +'.c', flags, '-o', plugins.objects[library_source])
00414         run(cc, '-shared', '-o', plugins.shareds_objects[library_source], plugins.objects[library_source], flags, to_links(plugins.libs_of[library_source]))
00415 def compile():
00416     """ Compiles *.c files in source directory """
00417     compile_plugins()
00418     for source in sources:
00419         run(cc, '-g', '-c', "./src/" + source+'.c', flags)
00420 def get_test_bin(source):
00421     return source.replace("/", "_") +'.test'
00422 def compile_test_with_libs(source, libs, includes = []):
00423     """ Builds unit test binary """
00424     path = "./src/" + source +'.c'
00425     if not os.path.exists(path):
00426         print("warning: no test for {}".format(path))
00427     else: 
00428         run(cc, '-I', 'src', to_includes(includes), '-g', '-o', get_test_bin(source), path, flags,
00429                 test_flags, to_links(libs))
00430 def compile_test():
00431     """ Builds unit test binary """
00432     for source in sources:
00433         compile_test_with_libs(source, common_libs)
00434     for library_source in plugins.libraries_sources:
00435         compile_test_with_libs(plugins.test_of[library_source],
00436                 plugins.libs_of[library_source], plugins.includes_of[library_source])
00437 def link():
00438     """ Finalize the binary generation by linking all object files """
00439     objects = [s+'.o' for s in sources]
00440     run(cc, '-g', objects, '-o', binary_name, flags, to_links(libs))
00441 def install():
00442     """ installs fuse kafka on current system, i.e. installs:
00443         - fuse_kafka binary in $BUILDROOT/usr/bin
00444         - fuse_kafka init script in $BUILDROOT/etc/init.d
00445         - fuse_kafka configuration $BUILDROOT/etc
00446 
00447     - BUILDROOT (environment variable): the target directory where to install fuse_kafka, 
00448         if not specified, will be filesystem root ('/')
00449     """
00450     root = '/'
00451     if os.environ.get('BUILDROOT') != None:
00452         root = os.environ.get('BUILDROOT') + "/"
00453     build()
00454     install_directory = root + 'usr/bin/'
00455     lib_directory = root + 'usr/lib/'
00456     init_directory = root + 'etc/init.d/'
00457     conf_directory = root + 'etc/'
00458     [run('mkdir', '-p', d) for d in
00459             [conf_directory, init_directory, install_directory, lib_directory]]
00460     for key in plugins.shareds_objects:
00461         if not target_matched(plugins.targets_of[key]): continue
00462         run('cp', plugins.shareds_objects[key], lib_directory)
00463     run('cp', binary_name, install_directory)
00464     [run('cp', 'src/' + init_name + '.py', init_directory + init_name)
00465             for init_name in ["fuse_kafka", "fuse_kafka_umounter"]]
00466     run('cp', 'conf/fuse_kafka.properties',
00467             conf_directory + "fuse_kafka.conf")
00468 def install_from_source(root, src, source):
00469     path = src + source['name']
00470     print(">> in " + path)
00471     if not 'branch' in source: source['branch'] = "master"
00472     if not 'append' in source: source['append'] = ""
00473     if not 'ignore' in source: source['ignore'] = False 
00474     if not os.path.exists(path):
00475         run("git", "clone", "--single-branch", "--branch", source['branch'], source['url'], cwd = src)
00476     if 'pre' in source: 
00477         pre = source['pre']
00478         run(*pre['action'], cwd = src + pre['cwd'])
00479     wd = os.getcwd()
00480     os.chdir(path)
00481     if not os.path.exists("configure"): os.system("autoreconf -if")
00482     if os.system("./configure --prefix=" + root + " " + source['append']) != 0 or\
00483         (os.system("make") != 0 and not source['ignore']) or\
00484         os.system("make install") != 0:
00485             raise Exception("build failed for " + path)
00486     os.chdir(wd)
00487 def install_dependencies():
00488     if os.environ.get('BUILDROOT') == None:
00489         print("no BUILDROOT specified")
00490         return
00491     if os.environ.get('SRCROOT') == None:
00492         os.environ['SRCROOT'] = "/tmp/fuse_kafka_src"
00493         return
00494     root = os.environ.get('BUILDROOT') + "/"
00495     src = os.environ.get('SRCROOT') + "/"
00496     for d in [root, src]: run("mkdir", "-p", d)
00497     for source in [
00498             {
00499                 'name'  :   'zookeeper/src/c',
00500                 'pre'   :   {'cwd': 'zookeeper', 'action':  ["ant", "compile_jute"]},
00501                 'url'   :   'https://github.com/fuse-kafka/zookeeper.git',
00502                 'branch':   'mingw',
00503                 'append':    '--host=mingw32',
00504                 },
00505             {
00506                 'name'  :   'zlib',
00507                 'url'   :   'https://github.com/fuse-kafka/zlib.git',
00508                 },
00509             {
00510                 'name'  :   'jansson',
00511                 'url'   :   'https://github.com/akheron/jansson',
00512                 'append':    '--host=mingw32',
00513                 },
00514             {
00515                 'name'  :   'dlfcn-win32',
00516                 'url'   :   'https://github.com/dlfcn-win32/dlfcn-win32',
00517                 'append':    '--cc=' + cc[0]
00518                 },
00519             {
00520                 'name'  :   'librdkafka',
00521                 'url'   :   'https://github.com/yazgoo/librdkafka',
00522                 'branch':   'win32',
00523                 'ignore':   True,
00524                 },
00525             ]: install_from_source(root, src, source)
00526     if target_matched("mingw"):
00527         for header in ["winconfig.h", "winstdint.h"]:
00528             run("cp", src + "/zookeeper/src/c/include/" + header, root + "/include/")
00529 def binary_archive():
00530     if os.environ.get('BUILDROOT') == None:
00531         os.environ['BUILDROOT'] = "/tmp/" + binary_name + "_install"
00532     install_dependencies()
00533     install()
00534     root = (os.environ.get('BUILDROOT') + "/").replace("//", "/")
00535     main_bin = root + "/usr/bin/" + binary_name
00536     if target_matched("mingw"):
00537         run("cp", main_bin, main_bin + ".exe")
00538         for dll in ["/usr/x86_64-w64-mingw32/lib/libwinpthread-1.dll",
00539                 "/usr/lib/gcc/x86_64-w64-mingw32/4.8/libgcc_s_sjlj-1.dll"]:
00540             run("cp", dll, root + "/usr/lib/")
00541     name = binary_name + "-" + get_version()
00542     package_name = "../" + name + "-bin"
00543     tar = package_name + ".tar.gz"
00544     if target_matched("mingw"):
00545         _zip = "IronPython-2.7.5.zip"
00546         run("wget", "http://download-codeplex.sec.s-msft.com/Download/Release?ProjectName=ironpython&DownloadId=970326&FileTime=130623736032830000&Build=21028", "-O", _zip)
00547         run("unzip", "-o", _zip, "-d", root)
00548     run("tar", "czf", tar, root,  "-C", root)
00549     run("zip", "-r", os.getcwd() + "/" + package_name + ".zip", os.path.basename(os.path.dirname(root)), cwd = root + "/..")
00550 def download_and_untargz(archive, directory_url, directory):
00551     if not os.path.exists(archive):
00552         os.system('wget ' + directory_url + "/" + archive)
00553     if not os.path.exists(directory):
00554         os.system('tar xzf ' + archive)
00555 def logstash_download():
00556     download_and_untargz(logstash_archive, logstash_server, logstash_directory)
00557 def tail():
00558     kafka_download()
00559     logstash_download()
00560     os.chdir("src")
00561     os.system('../' + logstash_directory + '/bin/logstash agent -f ../conf/logstash.conf')
00562 def clean():
00563     """ Cleanups file generated by this script """
00564     autoclean()
00565 def kafka_download():
00566     """ Downloads kafka binary distribution archive and uncompresses it """
00567     download_and_untargz(kafka_archive, kafka_server + kafka_version, kafka_directory)
00568 def zookeeper_start():
00569     """ Does kafka_dowload() and starts zookeeper server """
00570     kafka_download()
00571     run(kafka_bin_directory + 'zookeeper-server-start.sh',
00572             kafka_config_directory + 'zookeeper.properties')
00573 def bench():
00574     Benchmarks().generate()
00575 def kafka_start():
00576     """ Does kafka_dowload() and starts kafka server """
00577     kafka_download()
00578     run(kafka_bin_directory + 'kafka-server-start.sh',
00579             kafka_config_directory + 'server.properties')
00580 def kafka_consumer_start():
00581     """ Starts a kafka logstash/fuse_kafka events consumer
00582         (pretty printing events)
00583     
00584     - zkconnect (environment variable): if specified, launches the kafka
00585         consumer based on this zookeeper cluster address, otherwise looks for a 
00586         zookeeper on localhost
00587     """
00588     zkconnect = os.environ.get('zkconnect')
00589     if zkconnect == None: zkconnect = "localhost"
00590     FuseKafkaLog(zkconnect).start()
00591 def create_topic_command(zkconnect):
00592     """
00593     Return a command line for creating a new logging topic on kafka cluster
00594 
00595     - zkconnect: the zookeeper cluster endpoint to kafka
00596     """
00597     return kafka_bin_directory + 'kafka-topics.sh --create --topic logs --zookeeper {} --partitions 2 --replication-factor 1'.format(zkconnect)
00598 def wait_for_input():
00599     try:
00600         raw_input(">")
00601     except:
00602         print("done")
00603 fuse_kafka_input_line = "fuse_kafka_input=[\"{0}\"]"
00604 overlay_line = fuse_kafka_input_line.format("overlay")
00605 inotify_line = fuse_kafka_input_line.format("inotify")
00606 def ruby_write_tests():
00607     print(">> launching overlay write")
00608     cmd = "ruby write_tests.rb /tmp/write_tests."
00609     os.system('cat ./conf/fuse_kafka.properties')
00610     os.system(cmd + "overlay")
00611     comment_conf(overlay_line)
00612     uncomment_conf(inotify_line)
00613     os.system('./src/fuse_kafka.py restart')
00614     print(">> launching inotify write")
00615     os.system('cat ./conf/fuse_kafka.properties')
00616     os.system(cmd + "inotify")
00617 def comment_conf(what, first = "", second = "#"):
00618     os.system("sed -i 's/^{}{}$/{}{}/' conf/fuse_kafka.properties".format(
00619         first, what.replace("[", ".").replace("]", "."), second, what))
00620 def uncomment_conf(what):
00621     comment_conf(what, first = "#", second = "")
00622 def write_tests():
00623     """ Launches kafka, zookeeper, fuse_kafka """
00624     uncomment_conf(overlay_line)
00625     comment_conf(inotify_line)
00626     quickstart(consumer = False, synchronous_action = "ruby_write_tests")
00627 def quickstart(consumer = True, synchronous_action = "wait_for_input"):
00628     """ Launches kafka, zookeeper, fuse_kafka and a console consumer locally """
00629     kafka_download()
00630     klog = '/tmp/kafka.log'
00631     zlog = '/tmp/zookeeper.log'
00632     if os.path.exists(klog): shutil.rmtree(klog)
00633     if os.path.exists(zlog): shutil.rmtree(zlog)
00634     p1 = multiprocessing.Process(target=zookeeper_start, args=())
00635     p2 = multiprocessing.Process(target=kafka_start, args=())
00636     p3 = None
00637     if consumer: p3 = multiprocessing.Process(target=kafka_consumer_start, args=())
00638     p1.start()
00639     p2.start()
00640     result = 1
00641     while result != 0:
00642         result = os.system(create_topic_command('localhost'))
00643         time.sleep(0.2)
00644     os.system('./src/fuse_kafka.py start')
00645     if consumer: p3.start()
00646     try:
00647         f = globals()[synchronous_action]
00648         if f == None:
00649             f = locals()[synchronous_action]
00650             if f != None:
00651                 f()
00652         else:
00653             f()
00654     except e:
00655         print(e)
00656     p1.terminate()
00657     p2.terminate()
00658     if consumer:
00659         p3.terminate()
00660         os.system('pkill -9 -f java.*kafka.consumer.ConsoleConsumer')
00661     os.system('./src/fuse_kafka.py stop')
00662     os.system(kafka_bin_directory + 'kafka-server-stop.sh')
00663     os.system(kafka_bin_directory + 'zookeeper-server-stop.sh')
00664 def doc():
00665     """ generates the project documentation """
00666     run('mkdir', '-p', 'doc')
00667     run("doxygen", "Doxyfile")
00668 class TestMininet(unittest.TestCase):
00669     """ Utility to create a virtual network to test fuse kafka resiliancy """
00670     def impersonate(self, inital_user = True):
00671         """ changes effective group and user ids """
00672         uid = gid = None
00673         if inital_user:
00674             uid = os.getuid()
00675             gid = os.getuid()
00676         else:
00677             stat = os.stat(".")
00678             uid = stat.st_uid
00679             gid = stat.st_gid
00680         os.setegid(gid)
00681         os.seteuid(uid)
00682     def start_network(self):
00683         """ starts-up a single switch topology """
00684         from mininet.topo import Topo
00685         from mininet.net import Mininet
00686         from mininet.node import OVSController
00687         class SingleSwitchTopo(Topo):
00688             "Single Switch Topology"
00689             def __init__(self, count=1, **params):
00690                 Topo.__init__(self, **params)
00691                 hosts = [ self.addHost('h%d' % i) for i in range(1, count + 1) ]
00692                 s1 = self.addSwitch('s1')
00693                 for h in hosts:
00694                     self.addLink(h, s1)
00695         self.net = Mininet(topo = SingleSwitchTopo(4), controller = OVSController)
00696         self.net.start()
00697         self.impersonate(False)
00698     def log_path(self, name):
00699         return "/tmp/{}.log".format(name)
00700     def shell(self):
00701         """ launches mininet CLI """
00702         from mininet.cli import CLI
00703         CLI(self.net)
00704     def clients_initialize(self):
00705         """ initializes clients variables based on hosts """
00706         self.kafka = self.net.get('h1')
00707         self.zookeeper = self.net.get('h2')
00708         self.fuse_kafka = self.net.get('h3')
00709         self.client = self.net.get('h4')
00710         self.hosts = [self.kafka, self.zookeeper, self.fuse_kafka, self.client]
00711         self.switch = self.net.get('s1')
00712         self.java_clients = [self.client, self.kafka, self.zookeeper]
00713     def cmd(self, where, cmd):
00714         import pwd
00715         command = "su {} -c '{}'".format(
00716                 pwd.getpwuid(os.stat(".").st_uid).pw_name, cmd)
00717         print(command)
00718         return where.cmd(command)
00719     def data_directories_cleanup(self):
00720         """ cleanups generated directory """
00721         self.cmd(self.zookeeper, "rm -rf /tmp/kafka-logs /tmp/zookeeper")
00722     def zookeeper_start(self):
00723         """ starts zookeeper server """
00724         self.cmd(self.zookeeper, self.launch.format("zookeeper") 
00725             + kafka_config_directory
00726             + 'zookeeper.properties >> {} 2>&1 &'.format(self.log_path('zookeeper')))
00727     def kafka_start(self):
00728         """ starts kafka server and creates logging topic """
00729         import tempfile
00730         if not hasattr(self, 'kafka_config'):
00731             self.kafka_config = tempfile.NamedTemporaryFile(delete=False)
00732             self.kafka_config.write("zookeeper.connect={}\n".format(self.zookeeper.IP()))
00733             self.kafka_config.write("broker.id=0\n")
00734             self.kafka_config.write("host.name={}\n".format(self.kafka.IP()))
00735             self.kafka_config.close()
00736         self.cmd(self.kafka, self.launch.format("kafka")
00737                 + self.kafka_config.name + ' > {} 2>&1 &'.format(self.log_path('kafka')))
00738         self.cmd(self.kafka, create_topic_command(
00739             self.zookeeper.IP()) + " > {} 2>&1 ".format(self.log_path('create_topic')))
00740     def kafka_stop(self):
00741         """ stops kafka server """
00742         self.cmd(self.kafka, self.stop.format("kafka"))
00743     def zookeeper_stop(self):
00744         """ stops zookeeper server """
00745         self.cmd(self.zookeeper, "pkill -9 -f zookeeper.properties")
00746     def fuse_kafka_start(self):
00747         """ starts fuse_kafka """
00748         cwd = os.getcwd() + "/"
00749         self.fuse_kafka_path = '{}/fuse_kafka'.format(cwd)
00750         conf = "/tmp/conf"
00751         self.cmd(self.fuse_kafka, "mkdir -p {}".format(conf))
00752         self.cmd(self.fuse_kafka, "cp {}conf/fuse_kafka.properties {}".format(cwd, conf))
00753         self.cmd(self.fuse_kafka, "sed -i 's/127.0.0.1/{}/' {}/fuse_kafka.properties"
00754                 .format(self.zookeeper.IP(), conf))
00755         self.cmd(self.fuse_kafka, "ln -s {}/fuse_kafka {}/../fuse_kafka"
00756                 .format(cwd, conf))
00757         for path in glob.glob(cwd + "/*.so"):
00758             self.cmd(self.fuse_kafka, "ln -s {} {}/../{}"
00759                     .format(path, conf, path.split("/")[-1]))
00760         self.cmd(self.fuse_kafka, "ln -s {}/fuse_kafka {}/../fuse_kafka"
00761                 .format(cwd, conf))
00762         self.cmd(self.fuse_kafka, 'bash -c "cd {}/..;{}src/fuse_kafka.py start > {} 2>&1"'
00763                 .format(conf, cwd, self.log_path('fuse_kafka')))
00764     def consumer_start(self):
00765         """ starts fuse_kafka consumer """
00766         if os.path.exists(self.log_path('consumer')):
00767             os.remove(self.log_path('consumer'))
00768         command = os.getcwd() + "/" + kafka_bin_directory
00769         command += "kafka-console-consumer.sh --zookeeper "
00770         command += self.zookeeper.IP() + " --topic logs"
00771         print(command)
00772         self.impersonate() # popen require setns()
00773         self.consumer = self.client.popen(command)
00774         self.impersonate(False)
00775     def tearDown(self):
00776         """ stops fuse_kafka, zookeeper, kafka, cleans their working directory and 
00777         stops the virtual topology """
00778         for host in self.java_clients: self.cmd(host, 'pkill -9 java') 
00779         self.consumer.kill()
00780         self.cmd(self.fuse_kafka, 'src/fuse_kafka.py stop')
00781         os.remove(self.kafka_config.name)
00782         self.data_directories_cleanup()
00783         self.impersonate()
00784         self.net.stop()
00785     def setUp(self):
00786         """ starts the topology, downloads kafka, does a data directory
00787         cleanup in case of previous run """
00788         self.launch = kafka_bin_directory + '{}-server-start.sh '
00789         self.stop = kafka_bin_directory + '{}-server-stop.sh '
00790         self.start_network()
00791         kafka_download()
00792         self.clients_initialize()
00793         self.data_directories_cleanup()
00794         self.components_start()
00795         # wait for fuse-kafka to be ready
00796         time.sleep(2)
00797     def check(self):
00798         self.assertTrue(os.path.exists(self.fuse_kafka_path),
00799             "you must build fuse kafka to run tests")
00800         os.stat("/tmp/fuse-kafka-test")
00801     def get_consumed_events(self, expected_number):
00802         from mininet.util import pmonitor
00803         events = []
00804         log = FuseKafkaLog()
00805         popens = {}
00806         popens[self.client] = self.consumer
00807         for host, line in pmonitor(popens):
00808             self.consumer.poll()
00809             events.append(log.load_fuse_kafka_event(line))
00810             if len(events) >= expected_number:
00811                 break
00812         self.assertEqual(expected_number, len(events))
00813         return events
00814     def write_to_log(self, what = "test"):
00815         self.cmd(self.fuse_kafka, "echo -n {} > /tmp/fuse-kafka-test/xd 2>&1".format(what))
00816     def components_start(self):
00817         """ starts zookeepre, kafka, fuse_kafka, fuse_kafka consumer """
00818         self.zookeeper_start()
00819         self.kafka_start()
00820         self.fuse_kafka_start()
00821         self.consumer_start()
00822     def test_basic(self):
00823         """ runs the topology with a mininet shell """
00824         self.check()
00825         for message in ["hello", "world"]:
00826             self.write_to_log(message)
00827             events = self.get_consumed_events(1)
00828             self.assertEqual(message, events[0]["@message"])
00829         expected = ["foo", "bar"]
00830         for message in expected:
00831             self.write_to_log(message)
00832         actual = [event["@message"] for event in self.get_consumed_events(2)]
00833         self.assertEqual(sorted(expected), sorted(actual))
00834     def test_shutting_down_kafka(self):
00835         self.check()
00836         self.kafka_stop()
00837         self.write_to_log()
00838         self.kafka_start()
00839         self.get_consumed_events(1)
00840     def test_shutting_down_zookeeper(self):
00841         self.check()
00842         self.zookeeper_stop()
00843         self.write_to_log()
00844         self.zookeeper_start()
00845         self.get_consumed_events(1)
00846     def test_bringing_down_kafka(self):
00847         self.check()
00848         self.kafka_stop()
00849         self.write_to_log()
00850         self.kafka_start()
00851         self.get_consumed_events(1)
00852     def test_cutting_kafka(self):
00853         self.check()
00854         self.write_to_log()
00855         self.net.configLinkStatus(self.kafka.name, self.switch.name, "down") 
00856         self.assertRaises(ValueError, self.get_consumed_events, (1))
00857         self.net.configLinkStatus(self.kafka.name, self.switch.name, "up") 
00858         self.get_consumed_events(1)
00859     def test_cutting_zookeeper(self):
00860         self.check()
00861         self.write_to_log()
00862         self.net.configLinkStatus(self.zookeeper.name, self.switch.name, "down") 
00863         # zookeeper being brought down should not influence an already launched producer
00864         self.get_consumed_events(1)
00865     def test_cutting_kafka_periodically(self):
00866         self.check()
00867         ranges = {10: range(3), 1: range(4), 0: range(10)}
00868         for sleep_time in ranges:
00869             print("sleep time: " + str(sleep_time))
00870             for i in ranges[sleep_time]:
00871                 print("loop # " + str(i))
00872                 self.net.configLinkStatus(self.kafka.name, self.switch.name, "down") 
00873                 time.sleep(sleep_time)
00874                 self.assertRaises(ValueError, self.get_consumed_events, (1))
00875                 self.net.configLinkStatus(self.kafka.name, self.switch.name, "up") 
00876                 if sleep_time > 1:
00877                     time.sleep(7) # wait for kafka to be restarted
00878                 self.write_to_log()
00879                 self.get_consumed_events(1)
00880 if __name__ == "__main__":
00881     if len(sys.argv) <= 1 or not (sys.argv[1] in ["quickstart", "mininet", "bench", "multiple"]):
00882         main()
00883     else:
00884         if sys.argv[1] == "multiple":
00885             for arg in sys.argv[2:]:
00886                 locals()[arg]()
00887         elif sys.argv[1] == "quickstart": quickstart()
00888         elif sys.argv[1] == "bench": bench()
00889         else:
00890             sys.argv.pop(0)
00891             unittest.main()
 All Data Structures Files Functions Variables Defines