fuse_kafka
src/test.c
Go to the documentation of this file.
00001 
00002 /* added after modular_input */
00003 #include <sys/types.h>
00004 #include <dirent.h>
00005 #include "fuse.h"
00006 #include "hash.c"
00007 #include "kafka_client.c"
00008 #include "test_config.c"
00009 #include "output.c"
00010 /* end added after modular_input */
00011 #define STRINGIFY(x) #x
00012 #include "minunit.h"
00013 // LCOV_EXCL_START
00014 static char* get_file_content(char* path)
00015 {
00016     struct stat st;
00017     char* content;
00018     FILE* f;
00019     size_t read_nb_item;
00020     mu_assert("failed stating path", !stat(path, &st));
00021     content = (char*) malloc(st.st_size + 1);
00022     mu_assert("failed opening path", f = fopen(path, "r"));
00023     read_nb_item = fread((void*) content, st.st_size, 1, f);
00024     printf("read item number: %lu, file size: %lu\n", read_nb_item, st.st_size);
00025     mu_assert("did not read all file", read_nb_item == 1);
00026     fclose(f);
00027     content[st.st_size] = 0;
00028     return content;
00029 }
00030 static char* test_parse_arguments()
00031 {
00032     config conf;
00033     memset(&conf, 0, sizeof(config));
00034     char* argv[] = {"--topic", "logs", "--fields", "datacenter", "eu-west-1a", 
00035         "--directories", "/usr/local/tomcat1/logs", "--brokers", "server:9092",
00036         "--tags", "gen", "--persist", "no", "--excluded_files", "blah",
00037         "--substitutions", "lol"};
00038     int argc = sizeof(argv)/sizeof(char*);
00039     char* argv2[] = {"--lol"};
00040     char* argv3[] = {"-lol"};
00041     mu_assert("parse arguments failed", parse_arguments(argc, argv, &conf));
00042     mu_assert("parse arguments succeeded", !parse_arguments(1, argv2, &conf));
00043     mu_assert("parse arguments succeeded", parse_arguments(1, argv3, &conf));
00044     return 0;
00045 }
00046 static char* test_utils()
00047 {
00048     char* args[] = {"lol", "xd", "pdtr"};
00049     char* args2[] = {"xd", "--", "--lol"};
00050     char* args3[] = {"xd", "--", "--topic", "test"};
00051     char* container;
00052     *get_command_line_size() = 1;
00053     printf("command line is %s\n", get_command_line(1));
00054     *get_command_line_size() = 256;
00055     mu_assert("cmdline for process #1 should contain init or boot.sh or "
00056             "'/bin/bash /ds/build.sh install' or bash (docker)",
00057             strstr(get_command_line(1), "aW5pd") != NULL
00058             || strstr(get_command_line(1), "Ym9vd") != NULL
00059             || strstr(get_command_line(1), "L2Jpbi9iYXNoIA" /*docker*/) != NULL
00060             || strstr(get_command_line(1), "L2Jpbi9iYXNoIC9kcy9idWlsZC5zaCBpbnN0YWxsIA==") != NULL);
00061     mu_assert("found a process with UINT_MAX as pid!",
00062             !strcmp("", get_command_line(UINT_MAX)));
00063     mu_assert("getting limit failed", get_limit(2, args) == 2);
00064     container = array_to_container_string(args, 3, '[', ']', ',', ',');
00065     mu_assert("parsing argument should have failed",
00066             !fuse_kafka_main(3, args2));
00067     mu_assert("parsing argument should have succeed",
00068             fuse_kafka_main(4, args3) == 0);
00069     free(container);
00070     char* result = concat(args[0], args[1]);
00071     mu_assert("concatenation result should not be null",
00072             result != NULL);
00073     printf("result is %s\n", result);
00074     mu_assert("concatenation should be lol/xd",
00075             strcmp("lol/xd", result) == 0);
00076     free(result);
00077     char timestamp[] = "YYYY-MM-ddTHH:mm:ss.SSS+0000";
00078     set_timestamp(timestamp);
00079     mu_assert("timestamp should not be empty",
00080             timestamp[0] != 0);
00081     char* dir = "/tmp/mylittledir/";
00082     rmdir(dir);
00083     mkdir_p(dir);
00084     DIR* d = opendir(dir);
00085     mu_assert("timestamp should not be empty", d != NULL);
00086     closedir(d);
00087     return 0;
00088 }
00089 static int expect_base64(char* input, char* expected)
00090 {
00091     char* output = base64(input, strlen(input));
00092     int result = (strcmp(expected, output) == 0);
00093     printf("base64 output: %s\n", output);
00094     free(output);
00095     return result;
00096 }
00097 static char* test_utils_base64()
00098 {
00099 #define b64_assert(a, b) {\
00100     mu_assert("base64 encoding \"" a "\" should return \"" b "\"", \
00101             expect_base64(a, b)); }
00102     mu_assert("base64(NULL) should return NULL", base64(NULL, 0) == NULL);
00103     b64_assert("", "")
00104     b64_assert("0", "MA==")
00105     b64_assert("1", "MQ==")
00106     b64_assert("42", "NDI=")
00107     b64_assert("hello, world", "aGVsbG8sIHdvcmxk")
00108     b64_assert("sit amet, consectetur adipiscing elit. Aenean ut gravida.",
00109             "c2l0IGFtZXQsIGNvbnNlY3RldHVyIGFkaXBpc2NpbmcgZWxpdC4gQWVuZWFuIHV0IGdyYXZpZGEu")
00110     b64_assert("sit amet, consectetur adipiscing elit. Aenean ut gravida",
00111             "c2l0IGFtZXQsIGNvbnNlY3RldHVyIGFkaXBpc2NpbmcgZWxpdC4gQWVuZWFuIHV0IGdyYXZpZGE=")
00112     return 0;
00113 }
00114 static char* test_time_queue()
00115 {
00116     time_queue* queue = time_queue_new(10, 42);
00117     time_queue_set(queue, "a");
00118     mu_assert("time queue item should be null",
00119             time_queue_get(queue, "") == NULL);
00120     mu_assert("time queue does not overflows",
00121             time_queue_overflows(queue, "a", 42) == 1);
00122     *(time_queue_get(queue, "a")) -= 1000;
00123     mu_assert("time queue does not overflows",
00124             time_queue_overflows(queue, "a", 42) == 1);
00125     time_queue_set(queue, "a");
00126     time_queue_delete(queue);
00127     return 0;
00128 }
00129 int verbose_string_list_add(string_list** list, char* word)
00130 {
00131     printf("adding %s on a list(%d, %d)\n",
00132             word, (*list)->size, (*list)->max_size);
00133     return string_list_add(list, word);
00134 }
00135 static char* test_string_list_fillup_to(
00136         char* word, string_list* list, size_t size)
00137 {
00138     int i;
00139     for(i = 0; i < size; i++)
00140     {
00141         word[4] = '0' + i;
00142         mu_assert("server list add should work",
00143                 !verbose_string_list_add(&list, word));
00144     }
00145     return 0;
00146 }
00147 static char* test_string_list()
00148 {
00149     string_list* list = NULL, *list2 = NULL, *list3 = NULL;
00150     mu_assert("server list should not contain blah",
00151             !string_list_contains(&list, "blah"));
00152     mu_assert("server list add should work",    
00153             !string_list_add(&list, "blah"));
00154     mu_assert("server list should not contain foo",
00155             !string_list_contains(&list, "foo"));
00156     mu_assert("server list should contain blah",
00157             string_list_contains(&list, "blah"));
00158     char word[10];
00159     strcpy(word, "word_");
00160     test_string_list_fillup_to(word, list,
00161             SERVER_LIST_DEFAULT_MAX_SIZE);
00162     word[3] = 'm';
00163     test_string_list_fillup_to(word, list,
00164             SERVER_LIST_DEFAULT_MAX_SIZE - 1);
00165     *(falloc_fails()) = 1;
00166     word[1] = 'a';
00167     mu_assert("adding word should fail since the list resize should fail",
00168             verbose_string_list_add(&list, word));
00169     *(falloc_fails()) = 0;
00170     *(fcalloc_fails()) = 1;
00171     mu_assert("adding word should fail because of calloc fail",
00172             verbose_string_list_add(&list, word));
00173     mu_assert("list add once should fail",
00174             2 == string_list_add_once(&list, word));
00175     *(fcalloc_fails()) = 0;
00176     string_list_free(&list);
00177     *(fcalloc_fails()) = 1;
00178     mu_assert("creating a new list should fail because of calloc failure",
00179             string_list_new(&list2));
00180     *(fcalloc_fails()) = 0;
00181     string_list_free(&list2);
00182     *(falloc_fails()) = 1;
00183     mu_assert("creating a new list should fail because of malloc failure",
00184             string_list_add(&list3, word));
00185     *(falloc_fails()) = 0;
00186     string_list_free(&list3);
00187     return 0;
00188 }
00189 static char* test_server_list()
00190 {
00191     server_list* list = NULL;
00192     char blah[] = "blah";
00193     char blah_foo[] = "blah,foo";
00194     char foo[] = "foo";
00195     char foo_blah[] = "foo,blah";
00196     char empty[] = "";
00197     mu_assert("adding blah once should succeed",
00198             server_list_add_once(&list, blah));
00199     mu_assert("adding blah once should fail since it was already added",
00200             !server_list_add_once(&list, blah));
00201     mu_assert("adding blah once should succed: foo was not registered",
00202             server_list_add_once(&list, blah_foo));
00203     mu_assert("adding blah once should not succed: foo was registered",
00204             !server_list_add_once(&list, foo));
00205     mu_assert("adding blah once should not succed: foo and blah registered",
00206             !server_list_add_once(&list, foo_blah));
00207     mu_assert("adding empty once should succeed",
00208             server_list_add_once(&list, empty));
00209     mu_assert("adding NULL once should fail",
00210             !server_list_add_once(&list, NULL));
00211     server_list_free(&list);
00212     return 0;
00213 }
00214 static char* test_zookeeper()
00215 {
00216     char* topics[] = {"test"};
00217     char* brokers[] = {0};
00218     rd_kafka_t rk;
00219     kafka_t k;
00220     config conf;
00221     k.conf = &conf;
00222     k.conf->topic = topics;
00223     k.conf->brokers = brokers;
00224     k.rk = &rk;
00225     test_with()->rd_kafka_topic_new_returns_NULL = 0;
00226     mu_assert("zhandle_t should not be null",
00227             initialize_zookeeper("", &k) != NULL);
00228     mu_assert("zhandle_t should be null",
00229             initialize_zookeeper(NULL, &k) == NULL);
00230     test_with()->rd_kafka_topic_new_returns_NULL = 1;
00231     mu_assert("zhandle_t should not be null",
00232             initialize_zookeeper("", &k) != NULL);
00233     test_with()->zoo_get_children_returns = 0;
00234     mu_assert("zhandle_t should not be null",
00235             initialize_zookeeper("", &k) != NULL);
00236     /* for coverage */
00237     rd_kafka_destroy(NULL);
00238     rd_kafka_wait_destroyed(42);
00239     rd_kafka_topic_destroy(NULL);
00240     zookeeper_close(NULL);
00241     set_brokerlist_from_zookeeper(NULL, NULL);
00242     zhandle_t zzh;
00243     set_brokerlist_from_zookeeper(&zzh, NULL);
00244     test_with()->zoo_get_children_returns = 1;
00245     set_brokerlist_from_zookeeper(&zzh, NULL);
00246     char* b = (char*) malloc(20);
00247     set_brokerlist_from_zookeeper(&zzh, b);
00248 #define test_output_expected "a:2181,a:2181"
00249     mu_assert("b should equal " test_output_expected, strcmp(test_output_expected, b) == 0);
00250     free(b);
00251     k.conf->topic_n = 1;
00252     watcher(NULL, 0, 0, 0, &k);
00253     k.conf->brokers = topics;
00254     watcher(NULL, 0, 0, 0, &k);
00255     watcher_add_brokers(&k, "brokers", "topic");
00256     return 0;
00257 }
00258 static char* test_trace()
00259 {
00260     SET_CONFIG;
00261     trace("blah");
00262     return 0;
00263 }
00264 void dynamic_configuration_handler(int argc, char**argv, void* context)
00265 {
00266     *dynamic_configuration_watch_routine_running() = 0;
00267 }
00268 void zktouch(char* path)
00269 {
00270     FILE* f = fopen(path, "w");
00271     char* str = "--zookeepers  test ";
00272     flock(fileno(f), LOCK_EX);
00273     fwrite(str, strlen(str), 1, f);
00274     fclose(f);
00275 }
00276 static char* test_dynamic_configuration()
00277 {
00278     char* line;
00279     char** argv;
00280     int argc;
00281     char* conf_path = "/tmp/fuse_kafka_test_dynamic_configuration";
00282     unlink(conf_path);
00283     unlink("/tmp/fuse_kafka.args");
00284     mu_assert("loading dynamic configuration should fail",
00285             dynamic_configuration_load() == 1);
00286     zktouch(conf_path);
00287     dynamic_configuration_get()->path = conf_path;
00288     mu_assert("parse_line_from_file should return 1",
00289             parse_line_from_file(NULL, NULL, NULL) == 1);
00290     mu_assert("parse_args_from_file should return 1",
00291             parse_args_from_file(NULL, &argc, &argv, &line) == 1);
00292     mu_assert("parse_args_from_file should return 0",
00293             parse_args_from_file(conf_path, &argc, &argv, &line) == 0);
00294     dynamic_configuration_free();
00295     dynamic_configuration_load();
00296     mu_assert("dynamic_configuration_changed should return 1",
00297             dynamic_configuration_changed() == 1); /* TODO should be 0 */
00298     dynamic_configuration_watch_stop();
00299     *(dynamic_configuration_get_last_change()) = 1;
00300     dynamic_configuration_get()->context = (void*) 1;
00301     dynamic_configuration_watch_routine(dynamic_configuration_handler);
00302     mu_assert("dynamic configuration watch routine should have been fired up",
00303             *dynamic_configuration_watch_routine_running() == 0);
00304     *dynamic_configuration_watch_routine_running() = 1;
00305     dynamic_configuration_get()->context = NULL;
00306     unlink(conf_path);
00307     return 0;
00308 }
00309 static char* test_fk_hash()
00310 {
00311     fk_hash hash = fk_hash_new();
00312     fk_hash_put(hash, "test", (void*)42, 1);
00313     printf("test value: %p\n", fk_hash_get(hash, "test", 1));
00314     mu_assert("test should be 42", fk_hash_get(hash, "test", 1) == (void*)42);
00315     // teest hashes the same
00316     fk_hash_put(hash, "teest", (void*)43, 1);
00317     mu_assert("teest should be 43", fk_hash_get(hash, "teest", 1) == (void*)43);
00318     mu_assert("test #2 should be 42", fk_hash_get(hash, "test", 1) == (void*)42);
00319     fk_hash_put(hash, "test", (void*)40, 1);
00320     mu_assert("test #3 should be 40", fk_hash_get(hash, "test", 1) == (void*)40);
00321     fk_hash_remove(hash, "test", 1, 0, 0);
00322     mu_assert("test should be -1", fk_hash_get(hash, "test", 1) == (void*)-1);
00323     fk_hash_put(hash, "teeest", (void*)44, 1);
00324     mu_assert("teeest should be 44", fk_hash_get(hash, "teeest", 1) == (void*)44);
00325     fk_hash_remove(hash, "teeest", 1, 0, 0);
00326     mu_assert("teeest should be -1", fk_hash_get(hash, "teeest", 1) == (void*)-1);
00327     fk_hash_remove(hash, "test", 1, 0, 0);
00328     mu_assert("test should be -1", fk_hash_get(hash, "test", 1) == (void*)-1);
00329     fk_hash_remove(hash, "teest", 1, 0, 0);
00330     mu_assert("test should be -1", fk_hash_get(hash, "teest", 1) == (void*)-1);
00331     fk_hash_delete(hash, 0, 0);
00332     fk_hash_list_delete(fk_hash_list_new(0, 0), 0, 0);
00333     return 0;
00334 }
00335 static char* test_my_input_setup()
00336 {
00337     conf.input_n = 1;
00338     char* argv[] = {"nonexisting"};
00339     conf.input = argv;
00340     mu_assert("test_my_input_setup should return 1", 
00341             my_input_setup(0, argv, 1) == 1);
00342     return 0;
00343 }
00344 static char* test_output()
00345 {
00346     SET_CONFIG;
00347     char* excluded = "excluded";
00348     char* quota = "10000";
00349     test_with()->rd_kafka_conf_set_returns = RD_KAFKA_CONF_OK;
00350     test_with()->rd_kafka_topic_new_returns_NULL = 0;
00351     test_with()->rd_kafka_conf_set_returns = 0;
00352     conf.brokers_n = conf.zookeepers_n = 0;
00353     conf.output_n = 0;
00354     void* output = output_init(NULL);
00355     mu_assert("output_init(NULL) should be null", output == NULL);
00356     output = output_init(&conf);
00357     mu_assert("output should be null", output != NULL);
00358     test_with()->rd_kafka_conf_set_returns = RD_KAFKA_CONF_OK;
00359     output = output_init(&conf);
00360     mu_assert("output should not be null", output != NULL);
00361     conf.quota_queue = time_queue_new(10, 42);
00362     conf.quota_n = 1;
00363     conf.quota = &quota;
00364     conf.topic_n = 0;
00365     output_destroy(output);
00366     conf.output = &quota;
00367     conf.output_n = 1;
00368     output = output_init(&conf);
00369     mu_assert("output should be null with wrong output plugin pecified", output == NULL);
00370     conf.output_n = 0;
00371     output = output_init(&conf);
00372     mu_assert("output is not null", output != NULL);
00373     /* TODO uncomment
00374     mu_assert("sending empty string succeeds",
00375             send_kafka(output, "", 0) == 0);*/
00376     output_write("", "", "", 0, 0);
00377     test_with()->asprintf_sets_NULL = 1;
00378     conf.excluded_files_n = 1;
00379     conf.excluded_files = &excluded;
00380     conf.quota_queue = time_queue_new(10, 42);
00381     conf.quota_n = 1;
00382     conf.quota = &quota;
00383     conf.topic_n = 0;
00384     conf.encoder_n = 0;
00385     ((kafka_t*) output)->rkt = (void*) 1;
00386     fuse_get_context()->private_data = output;
00387     mu_assert("should not write to kafka excluded file",
00388             should_write_to_kafka(excluded, 0) == 0);
00389     mu_assert("should write to kafka not excluded file",
00390             should_write_to_kafka("test", 0) == 1);
00391     mu_assert("actual_kafka_write should return 1 if asprintf is failing",
00392             actual_kafka_write("", "", "", 0, 0) == 1);
00393     test_with()->asprintf_sets_NULL = 0;
00394     output_write("", "", "", 0, 0);
00395     mu_assert("actual_kafka_write should return 0 if asprintf is not failing",
00396             actual_kafka_write("", "", "", 0, 0) == 0);
00397     conf.encoder_n = 1;
00398     char* encoder[] = {"text"};
00399     conf.encoder = encoder;
00400     mu_assert("actual_kafka_write should return 0 if asprintf is not failing",
00401             actual_kafka_write("", "", "", 0, 0) == 0);
00402     conf.encoder_n = 0;
00403     conf.zookeepers_n = conf.brokers_n = 0;
00404     input_setup_internal(0, NULL, &conf);
00405     ((kafka_t*)output)->zhandle = (void*) 1;
00406     setup_from_dynamic_configuration(0, NULL, output);
00407     char* argv[] = {"--zookeepers", "zk"};
00408     int argc = sizeof(argv)/sizeof(char*);
00409     setup_from_dynamic_configuration(argc, argv, output);
00410     output_destroy(output);
00411     return 0;
00412 }
00413 static char* test_plugin()
00414 {
00415     void* f = load_plugin_function(OUTPUT_PLUGIN_PREFIX, "kafka", "blah");
00416     mu_assert("function should not be loaded", f == NULL);
00417     f = load_plugin_function(OUTPUT_PLUGIN_PREFIX, "kafka", "output_setup");
00418     mu_assert("function should be loaded", f != NULL);
00419     return 0;
00420 }
00421 int test_queue_n;
00422 char test_queue_chars[] = {0, 0};
00423 void test_queue_callback(const char *prefix, const char *path, char *buf,
00424         size_t size, off_t offset)
00425 {
00426     test_queue_chars[test_queue_n++] = path[0];
00427 }
00428 static char* test_queue()
00429 {
00430     events_dequeue(test_queue_callback);
00431     *(event_queue_max_size()) = 2;
00432     event_enqueue("a", "a", "a", 1, 0);
00433     event_enqueue("b", "b", "b", 1, 0);
00434     mu_assert("size should be 2", *(event_queue_size()) == 2);
00435     event_enqueue("c", "c", "c", 1, 0);
00436     mu_assert("size should still be 2", *(event_queue_size()) == 2);
00437     event_enqueue("d", "d", "d", 1, 0);
00438     mu_assert("size should again still be 2", *(event_queue_size()) == 2);
00439     test_queue_n = 0;
00440     events_dequeue(test_queue_callback);
00441     mu_assert("enqueued n should be 2", test_queue_n == 2);
00442     mu_assert("enqueued first char should be 'c'", test_queue_chars[0] == 'c');
00443     mu_assert("enqueued second char should be 'd'", test_queue_chars[1] == 'd');
00444     return 0;
00445 }
00446 static char* all_tests()
00447 {
00448     *(fk_sleep_enabled()) = 0;
00449     mu_run_test(test_parse_arguments);
00450     mu_run_test(test_utils);
00451     mu_run_test(test_utils_base64);
00452     printf("a\n");
00453     mu_run_test(test_time_queue);
00454     printf("b\n");
00455     mu_run_test(test_output);
00456     mu_run_test(test_zookeeper);
00457     mu_run_test(test_trace);
00458     mu_run_test(test_string_list);
00459     mu_run_test(test_server_list);
00460     mu_run_test(test_dynamic_configuration);
00461     mu_run_test(test_fk_hash);
00462     mu_run_test(test_my_input_setup);
00463     mu_run_test(test_plugin);
00464     mu_run_test(test_queue);
00465     return 0;
00466 }
00467 // LCOV_EXCL_STOP because we don't want coverage on unit tests
00468 #include "minunit.c"
 All Data Structures Files Functions Variables Defines