![]() |
fuse_kafka
|
00001 00002 /* added after modular_input */ 00003 #include <sys/types.h> 00004 #include <dirent.h> 00005 #include "fuse.h" 00006 #include "hash.c" 00007 #include "kafka_client.c" 00008 #include "test_config.c" 00009 #include "output.c" 00010 /* end added after modular_input */ 00011 #define STRINGIFY(x) #x 00012 #include "minunit.h" 00013 // LCOV_EXCL_START 00014 static char* get_file_content(char* path) 00015 { 00016 struct stat st; 00017 char* content; 00018 FILE* f; 00019 size_t read_nb_item; 00020 mu_assert("failed stating path", !stat(path, &st)); 00021 content = (char*) malloc(st.st_size + 1); 00022 mu_assert("failed opening path", f = fopen(path, "r")); 00023 read_nb_item = fread((void*) content, st.st_size, 1, f); 00024 printf("read item number: %lu, file size: %lu\n", read_nb_item, st.st_size); 00025 mu_assert("did not read all file", read_nb_item == 1); 00026 fclose(f); 00027 content[st.st_size] = 0; 00028 return content; 00029 } 00030 static char* test_parse_arguments() 00031 { 00032 config conf; 00033 memset(&conf, 0, sizeof(config)); 00034 char* argv[] = {"--topic", "logs", "--fields", "datacenter", "eu-west-1a", 00035 "--directories", "/usr/local/tomcat1/logs", "--brokers", "server:9092", 00036 "--tags", "gen", "--persist", "no", "--excluded_files", "blah", 00037 "--substitutions", "lol"}; 00038 int argc = sizeof(argv)/sizeof(char*); 00039 char* argv2[] = {"--lol"}; 00040 char* argv3[] = {"-lol"}; 00041 mu_assert("parse arguments failed", parse_arguments(argc, argv, &conf)); 00042 mu_assert("parse arguments succeeded", !parse_arguments(1, argv2, &conf)); 00043 mu_assert("parse arguments succeeded", parse_arguments(1, argv3, &conf)); 00044 return 0; 00045 } 00046 static char* test_utils() 00047 { 00048 char* args[] = {"lol", "xd", "pdtr"}; 00049 char* args2[] = {"xd", "--", "--lol"}; 00050 char* args3[] = {"xd", "--", "--topic", "test"}; 00051 char* container; 00052 *get_command_line_size() = 1; 00053 printf("command line is %s\n", get_command_line(1)); 00054 *get_command_line_size() = 256; 00055 mu_assert("cmdline for process #1 should contain init or boot.sh or " 00056 "'/bin/bash /ds/build.sh install' or bash (docker)", 00057 strstr(get_command_line(1), "aW5pd") != NULL 00058 || strstr(get_command_line(1), "Ym9vd") != NULL 00059 || strstr(get_command_line(1), "L2Jpbi9iYXNoIA" /*docker*/) != NULL 00060 || strstr(get_command_line(1), "L2Jpbi9iYXNoIC9kcy9idWlsZC5zaCBpbnN0YWxsIA==") != NULL); 00061 mu_assert("found a process with UINT_MAX as pid!", 00062 !strcmp("", get_command_line(UINT_MAX))); 00063 mu_assert("getting limit failed", get_limit(2, args) == 2); 00064 container = array_to_container_string(args, 3, '[', ']', ',', ','); 00065 mu_assert("parsing argument should have failed", 00066 !fuse_kafka_main(3, args2)); 00067 mu_assert("parsing argument should have succeed", 00068 fuse_kafka_main(4, args3) == 0); 00069 free(container); 00070 char* result = concat(args[0], args[1]); 00071 mu_assert("concatenation result should not be null", 00072 result != NULL); 00073 printf("result is %s\n", result); 00074 mu_assert("concatenation should be lol/xd", 00075 strcmp("lol/xd", result) == 0); 00076 free(result); 00077 char timestamp[] = "YYYY-MM-ddTHH:mm:ss.SSS+0000"; 00078 set_timestamp(timestamp); 00079 mu_assert("timestamp should not be empty", 00080 timestamp[0] != 0); 00081 char* dir = "/tmp/mylittledir/"; 00082 rmdir(dir); 00083 mkdir_p(dir); 00084 DIR* d = opendir(dir); 00085 mu_assert("timestamp should not be empty", d != NULL); 00086 closedir(d); 00087 return 0; 00088 } 00089 static int expect_base64(char* input, char* expected) 00090 { 00091 char* output = base64(input, strlen(input)); 00092 int result = (strcmp(expected, output) == 0); 00093 printf("base64 output: %s\n", output); 00094 free(output); 00095 return result; 00096 } 00097 static char* test_utils_base64() 00098 { 00099 #define b64_assert(a, b) {\ 00100 mu_assert("base64 encoding \"" a "\" should return \"" b "\"", \ 00101 expect_base64(a, b)); } 00102 mu_assert("base64(NULL) should return NULL", base64(NULL, 0) == NULL); 00103 b64_assert("", "") 00104 b64_assert("0", "MA==") 00105 b64_assert("1", "MQ==") 00106 b64_assert("42", "NDI=") 00107 b64_assert("hello, world", "aGVsbG8sIHdvcmxk") 00108 b64_assert("sit amet, consectetur adipiscing elit. Aenean ut gravida.", 00109 "c2l0IGFtZXQsIGNvbnNlY3RldHVyIGFkaXBpc2NpbmcgZWxpdC4gQWVuZWFuIHV0IGdyYXZpZGEu") 00110 b64_assert("sit amet, consectetur adipiscing elit. Aenean ut gravida", 00111 "c2l0IGFtZXQsIGNvbnNlY3RldHVyIGFkaXBpc2NpbmcgZWxpdC4gQWVuZWFuIHV0IGdyYXZpZGE=") 00112 return 0; 00113 } 00114 static char* test_time_queue() 00115 { 00116 time_queue* queue = time_queue_new(10, 42); 00117 time_queue_set(queue, "a"); 00118 mu_assert("time queue item should be null", 00119 time_queue_get(queue, "") == NULL); 00120 mu_assert("time queue does not overflows", 00121 time_queue_overflows(queue, "a", 42) == 1); 00122 *(time_queue_get(queue, "a")) -= 1000; 00123 mu_assert("time queue does not overflows", 00124 time_queue_overflows(queue, "a", 42) == 1); 00125 time_queue_set(queue, "a"); 00126 time_queue_delete(queue); 00127 return 0; 00128 } 00129 int verbose_string_list_add(string_list** list, char* word) 00130 { 00131 printf("adding %s on a list(%d, %d)\n", 00132 word, (*list)->size, (*list)->max_size); 00133 return string_list_add(list, word); 00134 } 00135 static char* test_string_list_fillup_to( 00136 char* word, string_list* list, size_t size) 00137 { 00138 int i; 00139 for(i = 0; i < size; i++) 00140 { 00141 word[4] = '0' + i; 00142 mu_assert("server list add should work", 00143 !verbose_string_list_add(&list, word)); 00144 } 00145 return 0; 00146 } 00147 static char* test_string_list() 00148 { 00149 string_list* list = NULL, *list2 = NULL, *list3 = NULL; 00150 mu_assert("server list should not contain blah", 00151 !string_list_contains(&list, "blah")); 00152 mu_assert("server list add should work", 00153 !string_list_add(&list, "blah")); 00154 mu_assert("server list should not contain foo", 00155 !string_list_contains(&list, "foo")); 00156 mu_assert("server list should contain blah", 00157 string_list_contains(&list, "blah")); 00158 char word[10]; 00159 strcpy(word, "word_"); 00160 test_string_list_fillup_to(word, list, 00161 SERVER_LIST_DEFAULT_MAX_SIZE); 00162 word[3] = 'm'; 00163 test_string_list_fillup_to(word, list, 00164 SERVER_LIST_DEFAULT_MAX_SIZE - 1); 00165 *(falloc_fails()) = 1; 00166 word[1] = 'a'; 00167 mu_assert("adding word should fail since the list resize should fail", 00168 verbose_string_list_add(&list, word)); 00169 *(falloc_fails()) = 0; 00170 *(fcalloc_fails()) = 1; 00171 mu_assert("adding word should fail because of calloc fail", 00172 verbose_string_list_add(&list, word)); 00173 mu_assert("list add once should fail", 00174 2 == string_list_add_once(&list, word)); 00175 *(fcalloc_fails()) = 0; 00176 string_list_free(&list); 00177 *(fcalloc_fails()) = 1; 00178 mu_assert("creating a new list should fail because of calloc failure", 00179 string_list_new(&list2)); 00180 *(fcalloc_fails()) = 0; 00181 string_list_free(&list2); 00182 *(falloc_fails()) = 1; 00183 mu_assert("creating a new list should fail because of malloc failure", 00184 string_list_add(&list3, word)); 00185 *(falloc_fails()) = 0; 00186 string_list_free(&list3); 00187 return 0; 00188 } 00189 static char* test_server_list() 00190 { 00191 server_list* list = NULL; 00192 char blah[] = "blah"; 00193 char blah_foo[] = "blah,foo"; 00194 char foo[] = "foo"; 00195 char foo_blah[] = "foo,blah"; 00196 char empty[] = ""; 00197 mu_assert("adding blah once should succeed", 00198 server_list_add_once(&list, blah)); 00199 mu_assert("adding blah once should fail since it was already added", 00200 !server_list_add_once(&list, blah)); 00201 mu_assert("adding blah once should succed: foo was not registered", 00202 server_list_add_once(&list, blah_foo)); 00203 mu_assert("adding blah once should not succed: foo was registered", 00204 !server_list_add_once(&list, foo)); 00205 mu_assert("adding blah once should not succed: foo and blah registered", 00206 !server_list_add_once(&list, foo_blah)); 00207 mu_assert("adding empty once should succeed", 00208 server_list_add_once(&list, empty)); 00209 mu_assert("adding NULL once should fail", 00210 !server_list_add_once(&list, NULL)); 00211 server_list_free(&list); 00212 return 0; 00213 } 00214 static char* test_zookeeper() 00215 { 00216 char* topics[] = {"test"}; 00217 char* brokers[] = {0}; 00218 rd_kafka_t rk; 00219 kafka_t k; 00220 config conf; 00221 k.conf = &conf; 00222 k.conf->topic = topics; 00223 k.conf->brokers = brokers; 00224 k.rk = &rk; 00225 test_with()->rd_kafka_topic_new_returns_NULL = 0; 00226 mu_assert("zhandle_t should not be null", 00227 initialize_zookeeper("", &k) != NULL); 00228 mu_assert("zhandle_t should be null", 00229 initialize_zookeeper(NULL, &k) == NULL); 00230 test_with()->rd_kafka_topic_new_returns_NULL = 1; 00231 mu_assert("zhandle_t should not be null", 00232 initialize_zookeeper("", &k) != NULL); 00233 test_with()->zoo_get_children_returns = 0; 00234 mu_assert("zhandle_t should not be null", 00235 initialize_zookeeper("", &k) != NULL); 00236 /* for coverage */ 00237 rd_kafka_destroy(NULL); 00238 rd_kafka_wait_destroyed(42); 00239 rd_kafka_topic_destroy(NULL); 00240 zookeeper_close(NULL); 00241 set_brokerlist_from_zookeeper(NULL, NULL); 00242 zhandle_t zzh; 00243 set_brokerlist_from_zookeeper(&zzh, NULL); 00244 test_with()->zoo_get_children_returns = 1; 00245 set_brokerlist_from_zookeeper(&zzh, NULL); 00246 char* b = (char*) malloc(20); 00247 set_brokerlist_from_zookeeper(&zzh, b); 00248 #define test_output_expected "a:2181,a:2181" 00249 mu_assert("b should equal " test_output_expected, strcmp(test_output_expected, b) == 0); 00250 free(b); 00251 k.conf->topic_n = 1; 00252 watcher(NULL, 0, 0, 0, &k); 00253 k.conf->brokers = topics; 00254 watcher(NULL, 0, 0, 0, &k); 00255 watcher_add_brokers(&k, "brokers", "topic"); 00256 return 0; 00257 } 00258 static char* test_trace() 00259 { 00260 SET_CONFIG; 00261 trace("blah"); 00262 return 0; 00263 } 00264 void dynamic_configuration_handler(int argc, char**argv, void* context) 00265 { 00266 *dynamic_configuration_watch_routine_running() = 0; 00267 } 00268 void zktouch(char* path) 00269 { 00270 FILE* f = fopen(path, "w"); 00271 char* str = "--zookeepers test "; 00272 flock(fileno(f), LOCK_EX); 00273 fwrite(str, strlen(str), 1, f); 00274 fclose(f); 00275 } 00276 static char* test_dynamic_configuration() 00277 { 00278 char* line; 00279 char** argv; 00280 int argc; 00281 char* conf_path = "/tmp/fuse_kafka_test_dynamic_configuration"; 00282 unlink(conf_path); 00283 unlink("/tmp/fuse_kafka.args"); 00284 mu_assert("loading dynamic configuration should fail", 00285 dynamic_configuration_load() == 1); 00286 zktouch(conf_path); 00287 dynamic_configuration_get()->path = conf_path; 00288 mu_assert("parse_line_from_file should return 1", 00289 parse_line_from_file(NULL, NULL, NULL) == 1); 00290 mu_assert("parse_args_from_file should return 1", 00291 parse_args_from_file(NULL, &argc, &argv, &line) == 1); 00292 mu_assert("parse_args_from_file should return 0", 00293 parse_args_from_file(conf_path, &argc, &argv, &line) == 0); 00294 dynamic_configuration_free(); 00295 dynamic_configuration_load(); 00296 mu_assert("dynamic_configuration_changed should return 1", 00297 dynamic_configuration_changed() == 1); /* TODO should be 0 */ 00298 dynamic_configuration_watch_stop(); 00299 *(dynamic_configuration_get_last_change()) = 1; 00300 dynamic_configuration_get()->context = (void*) 1; 00301 dynamic_configuration_watch_routine(dynamic_configuration_handler); 00302 mu_assert("dynamic configuration watch routine should have been fired up", 00303 *dynamic_configuration_watch_routine_running() == 0); 00304 *dynamic_configuration_watch_routine_running() = 1; 00305 dynamic_configuration_get()->context = NULL; 00306 unlink(conf_path); 00307 return 0; 00308 } 00309 static char* test_fk_hash() 00310 { 00311 fk_hash hash = fk_hash_new(); 00312 fk_hash_put(hash, "test", (void*)42, 1); 00313 printf("test value: %p\n", fk_hash_get(hash, "test", 1)); 00314 mu_assert("test should be 42", fk_hash_get(hash, "test", 1) == (void*)42); 00315 // teest hashes the same 00316 fk_hash_put(hash, "teest", (void*)43, 1); 00317 mu_assert("teest should be 43", fk_hash_get(hash, "teest", 1) == (void*)43); 00318 mu_assert("test #2 should be 42", fk_hash_get(hash, "test", 1) == (void*)42); 00319 fk_hash_put(hash, "test", (void*)40, 1); 00320 mu_assert("test #3 should be 40", fk_hash_get(hash, "test", 1) == (void*)40); 00321 fk_hash_remove(hash, "test", 1, 0, 0); 00322 mu_assert("test should be -1", fk_hash_get(hash, "test", 1) == (void*)-1); 00323 fk_hash_put(hash, "teeest", (void*)44, 1); 00324 mu_assert("teeest should be 44", fk_hash_get(hash, "teeest", 1) == (void*)44); 00325 fk_hash_remove(hash, "teeest", 1, 0, 0); 00326 mu_assert("teeest should be -1", fk_hash_get(hash, "teeest", 1) == (void*)-1); 00327 fk_hash_remove(hash, "test", 1, 0, 0); 00328 mu_assert("test should be -1", fk_hash_get(hash, "test", 1) == (void*)-1); 00329 fk_hash_remove(hash, "teest", 1, 0, 0); 00330 mu_assert("test should be -1", fk_hash_get(hash, "teest", 1) == (void*)-1); 00331 fk_hash_delete(hash, 0, 0); 00332 fk_hash_list_delete(fk_hash_list_new(0, 0), 0, 0); 00333 return 0; 00334 } 00335 static char* test_my_input_setup() 00336 { 00337 conf.input_n = 1; 00338 char* argv[] = {"nonexisting"}; 00339 conf.input = argv; 00340 mu_assert("test_my_input_setup should return 1", 00341 my_input_setup(0, argv, 1) == 1); 00342 return 0; 00343 } 00344 static char* test_output() 00345 { 00346 SET_CONFIG; 00347 char* excluded = "excluded"; 00348 char* quota = "10000"; 00349 test_with()->rd_kafka_conf_set_returns = RD_KAFKA_CONF_OK; 00350 test_with()->rd_kafka_topic_new_returns_NULL = 0; 00351 test_with()->rd_kafka_conf_set_returns = 0; 00352 conf.brokers_n = conf.zookeepers_n = 0; 00353 conf.output_n = 0; 00354 void* output = output_init(NULL); 00355 mu_assert("output_init(NULL) should be null", output == NULL); 00356 output = output_init(&conf); 00357 mu_assert("output should be null", output != NULL); 00358 test_with()->rd_kafka_conf_set_returns = RD_KAFKA_CONF_OK; 00359 output = output_init(&conf); 00360 mu_assert("output should not be null", output != NULL); 00361 conf.quota_queue = time_queue_new(10, 42); 00362 conf.quota_n = 1; 00363 conf.quota = "a; 00364 conf.topic_n = 0; 00365 output_destroy(output); 00366 conf.output = "a; 00367 conf.output_n = 1; 00368 output = output_init(&conf); 00369 mu_assert("output should be null with wrong output plugin pecified", output == NULL); 00370 conf.output_n = 0; 00371 output = output_init(&conf); 00372 mu_assert("output is not null", output != NULL); 00373 /* TODO uncomment 00374 mu_assert("sending empty string succeeds", 00375 send_kafka(output, "", 0) == 0);*/ 00376 output_write("", "", "", 0, 0); 00377 test_with()->asprintf_sets_NULL = 1; 00378 conf.excluded_files_n = 1; 00379 conf.excluded_files = &excluded; 00380 conf.quota_queue = time_queue_new(10, 42); 00381 conf.quota_n = 1; 00382 conf.quota = "a; 00383 conf.topic_n = 0; 00384 conf.encoder_n = 0; 00385 ((kafka_t*) output)->rkt = (void*) 1; 00386 fuse_get_context()->private_data = output; 00387 mu_assert("should not write to kafka excluded file", 00388 should_write_to_kafka(excluded, 0) == 0); 00389 mu_assert("should write to kafka not excluded file", 00390 should_write_to_kafka("test", 0) == 1); 00391 mu_assert("actual_kafka_write should return 1 if asprintf is failing", 00392 actual_kafka_write("", "", "", 0, 0) == 1); 00393 test_with()->asprintf_sets_NULL = 0; 00394 output_write("", "", "", 0, 0); 00395 mu_assert("actual_kafka_write should return 0 if asprintf is not failing", 00396 actual_kafka_write("", "", "", 0, 0) == 0); 00397 conf.encoder_n = 1; 00398 char* encoder[] = {"text"}; 00399 conf.encoder = encoder; 00400 mu_assert("actual_kafka_write should return 0 if asprintf is not failing", 00401 actual_kafka_write("", "", "", 0, 0) == 0); 00402 conf.encoder_n = 0; 00403 conf.zookeepers_n = conf.brokers_n = 0; 00404 input_setup_internal(0, NULL, &conf); 00405 ((kafka_t*)output)->zhandle = (void*) 1; 00406 setup_from_dynamic_configuration(0, NULL, output); 00407 char* argv[] = {"--zookeepers", "zk"}; 00408 int argc = sizeof(argv)/sizeof(char*); 00409 setup_from_dynamic_configuration(argc, argv, output); 00410 output_destroy(output); 00411 return 0; 00412 } 00413 static char* test_plugin() 00414 { 00415 void* f = load_plugin_function(OUTPUT_PLUGIN_PREFIX, "kafka", "blah"); 00416 mu_assert("function should not be loaded", f == NULL); 00417 f = load_plugin_function(OUTPUT_PLUGIN_PREFIX, "kafka", "output_setup"); 00418 mu_assert("function should be loaded", f != NULL); 00419 return 0; 00420 } 00421 int test_queue_n; 00422 char test_queue_chars[] = {0, 0}; 00423 void test_queue_callback(const char *prefix, const char *path, char *buf, 00424 size_t size, off_t offset) 00425 { 00426 test_queue_chars[test_queue_n++] = path[0]; 00427 } 00428 static char* test_queue() 00429 { 00430 events_dequeue(test_queue_callback); 00431 *(event_queue_max_size()) = 2; 00432 event_enqueue("a", "a", "a", 1, 0); 00433 event_enqueue("b", "b", "b", 1, 0); 00434 mu_assert("size should be 2", *(event_queue_size()) == 2); 00435 event_enqueue("c", "c", "c", 1, 0); 00436 mu_assert("size should still be 2", *(event_queue_size()) == 2); 00437 event_enqueue("d", "d", "d", 1, 0); 00438 mu_assert("size should again still be 2", *(event_queue_size()) == 2); 00439 test_queue_n = 0; 00440 events_dequeue(test_queue_callback); 00441 mu_assert("enqueued n should be 2", test_queue_n == 2); 00442 mu_assert("enqueued first char should be 'c'", test_queue_chars[0] == 'c'); 00443 mu_assert("enqueued second char should be 'd'", test_queue_chars[1] == 'd'); 00444 return 0; 00445 } 00446 static char* all_tests() 00447 { 00448 *(fk_sleep_enabled()) = 0; 00449 mu_run_test(test_parse_arguments); 00450 mu_run_test(test_utils); 00451 mu_run_test(test_utils_base64); 00452 printf("a\n"); 00453 mu_run_test(test_time_queue); 00454 printf("b\n"); 00455 mu_run_test(test_output); 00456 mu_run_test(test_zookeeper); 00457 mu_run_test(test_trace); 00458 mu_run_test(test_string_list); 00459 mu_run_test(test_server_list); 00460 mu_run_test(test_dynamic_configuration); 00461 mu_run_test(test_fk_hash); 00462 mu_run_test(test_my_input_setup); 00463 mu_run_test(test_plugin); 00464 mu_run_test(test_queue); 00465 return 0; 00466 } 00467 // LCOV_EXCL_STOP because we don't want coverage on unit tests 00468 #include "minunit.c"