![]() |
fuse_kafka
|
00001 #include "version.h" 00002 #include "output.h" 00003 #include "queue.c" 00004 #include "time_queue.c" 00005 #ifndef TEST 00006 #include "dynamic_configuration.c" 00007 #include "arguments.c" 00008 #endif 00009 #include "plugin.c" 00010 #ifdef _WIN32 00011 struct group { 00012 char* gr_name; 00013 }; 00014 struct passwd { 00015 char* pw_name; 00016 }; 00017 void* getgrgid(int i) 00018 { 00019 return NULL; 00020 } 00021 void* getpwuid(int i) 00022 { 00023 return NULL; 00024 } 00025 #else 00026 #include <grp.h> 00027 #include <pwd.h> 00028 #endif 00029 00038 static int actual_kafka_write(const char* prefix, const char *path, char *buf, 00039 size_t size, off_t offset) 00040 { 00041 kafka_t *private_data = (kafka_t*) fuse_get_context()->private_data; 00042 config* conf = (config*)private_data->conf; 00043 size_t i; 00044 char* ret = NULL; 00045 char* logstash = "logstash"; 00046 char* logstash_base64 = "logstash_base64"; 00047 char timestamp[] = "YYYY-MM-ddTHH:mm:ss.SSS+0000 "; 00048 set_timestamp(timestamp); 00049 char* text = buf; 00050 struct fuse_context* context = fuse_get_context(); 00051 struct group* sgroup = getgrgid(context->gid); 00052 struct passwd* suser = getpwuid(context->uid); 00053 char* user = suser == NULL ? "":suser->pw_name; 00054 char* group = sgroup == NULL ? "":sgroup->gr_name; 00055 char* command = get_command_line(context->pid); 00056 char* encoder = NULL; 00057 if(conf->encoder_n > 0) encoder = conf->encoder[0]; 00058 else encoder = logstash_base64; 00059 trace_debug("actual_kafka_write: encoder is %s", encoder); 00060 if(strncmp(encoder, logstash, strlen(logstash)) == 0) 00061 { 00062 int b64 = 0; 00063 if(b64 = (strncmp(encoder, logstash_base64, strlen(logstash_base64)) == 0)) 00064 text = base64(buf, size); 00065 char* format = "{\"path\": \"%s%s\", \"pid\": %d, \"uid\": %d, " 00066 "\"gid\": %d, \"@message\": \"%s\", \"@timestamp\": \"%s\"," 00067 "\"user\": \"%s\", \"group\": \"%s\", \"command\": \"%s\"," 00068 "\"@version\": \"%s\", \"@fields\": %s, \"@tags\": %s}"; 00069 asprintf(&ret, format, prefix, 00070 path + 1, context->pid, context->uid, context->gid, 00071 text, timestamp, user, group, command, VERSION, 00072 conf->fields_s, conf->tags_s); 00073 if(b64) free(text); 00074 } 00075 else if(strcmp(encoder, "text") == 0) 00076 { 00077 char* format = "%s: %s: %s"; 00078 asprintf(&ret, format, timestamp, path, text); 00079 } 00080 free(command); 00081 if (ret == NULL) { 00082 fprintf(stderr, "Error in asprintf\n"); 00083 return 1; 00084 } 00085 size_t length = strlen(ret); 00086 if(strstr(encoder, "fpenzoyr") != NULL) 00087 for(i = 0; i < length; i++) ret[i] ^= 255; 00088 trace_debug("actual_kafka_write: calling my_output_send()"); 00089 int r = my_output_send(context->private_data, ret, length); 00090 trace_debug("actual_kafka_write: my_output_send result == %d", r); 00091 free(ret); 00092 return 0; 00093 } 00094 #include "trace.c" 00101 static int should_write_to_kafka(const char* path, size_t size) 00102 { 00103 kafka_t *private_data = (kafka_t*) fuse_get_context()->private_data; 00104 config* conf = (config*)private_data->conf; 00105 int i = 0; 00106 for(i = 0; i < conf->excluded_files_n; i++) 00107 { 00108 char* pattern = conf->excluded_files[i]; 00109 if(!fnmatch(pattern, path, 0)) 00110 { 00111 return 0; 00112 } 00113 } 00114 if(conf->quota_queue == NULL) return 1; 00115 if(time_queue_overflows(conf->quota_queue, (char*)path, size)) i = 0; 00116 else i = 1; 00117 time_queue_set(conf->quota_queue, (char*)path); 00118 return i; 00119 } 00120 int ready_to_write() 00121 { 00122 kafka_t *private_data = (kafka_t*) fuse_get_context()->private_data; 00123 trace_debug("should_write_to_kafka: private_data %x", private_data); 00124 if(private_data == NULL) return 0; 00125 trace_debug("should_write_to_kafka: private_data->rkt %x", private_data->rkt); 00126 if(private_data->rkt == NULL) return 0; 00127 return 1; 00128 } 00129 void output_write_without_queue(const char *prefix, const char *path, char *buf, 00130 size_t size, off_t offset) 00131 { 00132 if(should_write_to_kafka(path, size)) 00133 actual_kafka_write(prefix, path, buf, size, offset); 00134 } 00135 void output_write(const char *prefix, const char *path, char *buf, 00136 size_t size, off_t offset) 00137 { 00138 if(ready_to_write()) 00139 { 00140 events_dequeue(output_write_without_queue); 00141 output_write_without_queue(prefix, path, buf, size, offset); 00142 } 00143 else 00144 { 00145 event_enqueue((char*) prefix, (char*) path, buf, size, offset); 00146 } 00147 } 00148 #define PLUGIN_FUNCTION_GETTER(name)\ 00149 name##_t* get_##name()\ 00150 {\ 00151 static name##_t function_ptr = 0; \ 00152 return &function_ptr;\ 00153 } 00154 #define PLUGIN_FUNCTION(name)\ 00155 trace_debug("PLUGIN_FUNCTION: getting " STR(name));\ 00156 name##_t ptr = *(get_##name());\ 00157 trace_debug("PLUGIN_FUNCTION: got " STR(name) ": %x", ptr);\ 00158 if(ptr == NULL) return 1; return (*ptr) 00159 #define PLUGIN_FUNCTION_LOAD(name)\ 00160 trace_debug("my_output_setup: load_function_from_plugin("STR(name)\ 00161 ") result %x", f);\ 00162 *(get_##name()) = (name##_t) load_function_from_plugin(\ 00163 handle, STR(name));\ 00164 trace_debug("my_output_setup: load_function_from_plugin("STR(name)\ 00165 ") result %x",\ 00166 get_output_send()); 00167 PLUGIN_FUNCTION_GETTER(output_send) 00168 PLUGIN_FUNCTION_GETTER(output_clean) 00169 PLUGIN_FUNCTION_GETTER(output_update) 00170 int my_output_send(kafka_t* k, char* buf, size_t len) 00171 { PLUGIN_FUNCTION(output_send)(k, buf, len); } 00172 int my_output_clean(kafka_t* k) { PLUGIN_FUNCTION(output_clean)(k); } 00173 int my_output_update(kafka_t* k) { PLUGIN_FUNCTION(output_update)(k); } 00174 void output_destroy(void* untyped) 00175 { 00176 kafka_t* k = (kafka_t*) untyped; 00177 if(k == NULL) return; 00178 if(k->conf->quota_n > 0) time_queue_delete(k->conf->quota_queue); 00179 if(k->zhandle != NULL) zookeeper_close(k->zhandle); 00180 my_output_clean(k); 00181 free(k); 00182 dynamic_configuration_watch_stop(); 00183 } 00184 void setup_from_dynamic_configuration(int argc, char** argv, void* context) 00185 { 00186 kafka_t* k = (kafka_t*) context; 00187 memset(k->conf, 0, sizeof(config)); 00188 parse_arguments(argc, argv, k->conf); 00189 if(k->zhandle != NULL) 00190 { 00191 zookeeper_close(k->zhandle); 00192 k->zhandle = NULL; 00193 } 00194 my_output_update(k); 00195 } 00196 int my_output_setup(config* conf, void* k) 00197 { 00198 char* output = "kafka"; 00199 if(conf->output_n > 0) output = conf->output[0]; 00200 trace_debug("my_output_setup: loading output plugin %s", output); 00201 void* handle = load_plugin(OUTPUT_PLUGIN_PREFIX, output); 00202 trace_debug("my_output_setup: load_plugin result %x", handle); 00203 output_setup_t f = (output_setup_t) load_function_from_plugin(handle, "output_setup"); 00204 PLUGIN_FUNCTION_LOAD(output_send) 00205 PLUGIN_FUNCTION_LOAD(output_update) 00206 PLUGIN_FUNCTION_LOAD(output_clean) 00207 if(f != NULL) return f(k, conf); 00208 trace_debug("my_output_setup: output_setup is NULL"); 00209 return 1; 00210 } 00211 void* output_init(config* conf) 00212 { 00213 trace_debug("output_init: entry"); 00214 if(conf == NULL) return NULL; 00215 fuse_get_context()->private_data = (void*) conf; 00216 dynamic_configuration_watch(&setup_from_dynamic_configuration); 00217 trace_debug("output_init: watching dynamic configuration"); 00218 int directory_fd = conf->directory_fd; 00219 int time_queue_size; 00220 fchdir(directory_fd); 00221 close(directory_fd); 00222 kafka_t* k = (kafka_t*) malloc(sizeof(kafka_t)); 00223 memset(k, 0, sizeof(kafka_t)); 00224 trace_debug("output_init: calling my_output_setup"); 00225 if(my_output_setup(conf, (void*) k)) 00226 { 00227 printf("output_init: output_setup failed\n"); 00228 return NULL; 00229 } 00230 k->conf = conf; 00231 if(conf->quota_n > 0) 00232 { 00233 time_queue_size = conf->quota_n > 1 ? atoi(conf->quota[1]):20; 00234 conf->quota_queue = time_queue_new( 00235 time_queue_size, atoi(conf->quota[0])); 00236 } 00237 dynamic_configuration_get()->context = (void*) k; 00238 fuse_get_context()->private_data = (void*) k; 00239 return (void*) k; 00240 } 00241 void input_setup_internal(int argc, char** argv, void* conf) 00242 { 00243 fuse_get_context()->private_data = (void*) output_init((config*) conf); 00244 #ifndef TEST 00245 input_setup(argc, argv, conf); 00246 #endif 00247 }