fuse_kafka
src/output.c
00001 #include "version.h"
00002 #include "output.h"
00003 #include "queue.c"
00004 #include "time_queue.c"
00005 #ifndef TEST
00006 #include "dynamic_configuration.c"
00007 #include "arguments.c"
00008 #endif
00009 #include "plugin.c"
00010 #ifdef _WIN32
00011 struct group {
00012     char* gr_name;
00013 };
00014 struct passwd {
00015     char* pw_name;
00016 };
00017 void* getgrgid(int i)
00018 {
00019     return NULL;
00020 }
00021 void* getpwuid(int i)
00022 {
00023     return NULL;
00024 }
00025 #else
00026 #include <grp.h>
00027 #include <pwd.h>
00028 #endif
00029 
00038 static int actual_kafka_write(const char* prefix, const char *path, char *buf,
00039         size_t size, off_t offset)
00040 {
00041     kafka_t *private_data = (kafka_t*) fuse_get_context()->private_data;
00042     config* conf = (config*)private_data->conf;
00043     size_t i;
00044     char* ret = NULL;
00045     char* logstash = "logstash";
00046     char* logstash_base64 = "logstash_base64";
00047     char timestamp[] = "YYYY-MM-ddTHH:mm:ss.SSS+0000                               ";
00048     set_timestamp(timestamp);
00049     char* text = buf;
00050     struct fuse_context* context = fuse_get_context();
00051     struct group* sgroup = getgrgid(context->gid);
00052     struct passwd* suser = getpwuid(context->uid);
00053     char* user = suser == NULL ? "":suser->pw_name;
00054     char* group = sgroup == NULL ? "":sgroup->gr_name;
00055     char* command = get_command_line(context->pid);
00056     char* encoder = NULL;
00057     if(conf->encoder_n > 0) encoder = conf->encoder[0];
00058     else encoder = logstash_base64;
00059     trace_debug("actual_kafka_write: encoder is %s", encoder);
00060     if(strncmp(encoder, logstash, strlen(logstash)) == 0)
00061     {
00062         int b64 = 0;
00063         if(b64 = (strncmp(encoder, logstash_base64, strlen(logstash_base64)) == 0))
00064             text = base64(buf, size);
00065         char* format = "{\"path\": \"%s%s\", \"pid\": %d, \"uid\": %d, "
00066             "\"gid\": %d, \"@message\": \"%s\", \"@timestamp\": \"%s\","
00067             "\"user\": \"%s\", \"group\": \"%s\", \"command\": \"%s\","
00068             "\"@version\": \"%s\", \"@fields\": %s, \"@tags\": %s}";
00069         asprintf(&ret, format, prefix,
00070                 path + 1, context->pid, context->uid, context->gid,
00071                 text, timestamp, user, group, command, VERSION,
00072                 conf->fields_s, conf->tags_s);
00073         if(b64) free(text);
00074     }
00075     else if(strcmp(encoder, "text") == 0)
00076     {
00077         char* format = "%s: %s: %s";
00078         asprintf(&ret, format, timestamp, path, text);
00079     }
00080     free(command);
00081     if (ret == NULL) {
00082         fprintf(stderr, "Error in asprintf\n");
00083         return 1;
00084     }
00085     size_t length = strlen(ret);
00086     if(strstr(encoder, "fpenzoyr") != NULL)
00087         for(i = 0; i < length; i++) ret[i] ^= 255;
00088     trace_debug("actual_kafka_write: calling my_output_send()");
00089     int r = my_output_send(context->private_data, ret, length);
00090     trace_debug("actual_kafka_write: my_output_send result == %d", r);
00091     free(ret);
00092     return 0;
00093 }
00094 #include "trace.c"
00101 static int should_write_to_kafka(const char* path, size_t size)
00102 {
00103     kafka_t *private_data = (kafka_t*) fuse_get_context()->private_data;
00104     config* conf = (config*)private_data->conf;
00105     int i = 0;
00106     for(i = 0; i < conf->excluded_files_n; i++)
00107     {
00108         char* pattern = conf->excluded_files[i];
00109         if(!fnmatch(pattern, path, 0))
00110         {
00111             return 0;
00112         }
00113     }
00114     if(conf->quota_queue == NULL) return 1;
00115     if(time_queue_overflows(conf->quota_queue, (char*)path, size)) i = 0;
00116     else i = 1;
00117     time_queue_set(conf->quota_queue, (char*)path);
00118     return i;
00119 }
00120 int ready_to_write()
00121 {
00122     kafka_t *private_data = (kafka_t*) fuse_get_context()->private_data;
00123     trace_debug("should_write_to_kafka: private_data %x", private_data);
00124     if(private_data == NULL) return 0;
00125     trace_debug("should_write_to_kafka: private_data->rkt %x", private_data->rkt);
00126     if(private_data->rkt == NULL) return 0;
00127     return 1;
00128 }
00129 void output_write_without_queue(const char *prefix, const char *path, char *buf,
00130         size_t size, off_t offset)
00131 {
00132     if(should_write_to_kafka(path, size))
00133         actual_kafka_write(prefix, path, buf, size, offset);
00134 }
00135 void output_write(const char *prefix, const char *path, char *buf,
00136         size_t size, off_t offset)
00137 {
00138     if(ready_to_write())
00139     {
00140         events_dequeue(output_write_without_queue);
00141         output_write_without_queue(prefix, path, buf, size, offset);
00142     }
00143     else
00144     {
00145         event_enqueue((char*) prefix, (char*) path, buf, size, offset);
00146     }
00147 }
00148 #define PLUGIN_FUNCTION_GETTER(name)\
00149 name##_t* get_##name()\
00150 {\
00151     static name##_t function_ptr = 0; \
00152     return &function_ptr;\
00153 }
00154 #define PLUGIN_FUNCTION(name)\
00155     trace_debug("PLUGIN_FUNCTION: getting " STR(name));\
00156     name##_t ptr = *(get_##name());\
00157     trace_debug("PLUGIN_FUNCTION: got " STR(name) ": %x", ptr);\
00158     if(ptr == NULL) return 1; return (*ptr)
00159 #define PLUGIN_FUNCTION_LOAD(name)\
00160     trace_debug("my_output_setup: load_function_from_plugin("STR(name)\
00161             ") result %x", f);\
00162     *(get_##name()) = (name##_t) load_function_from_plugin(\
00163             handle, STR(name));\
00164     trace_debug("my_output_setup: load_function_from_plugin("STR(name)\
00165             ") result %x",\
00166             get_output_send());
00167 PLUGIN_FUNCTION_GETTER(output_send)
00168 PLUGIN_FUNCTION_GETTER(output_clean)
00169 PLUGIN_FUNCTION_GETTER(output_update)
00170 int my_output_send(kafka_t* k, char* buf, size_t len)
00171 { PLUGIN_FUNCTION(output_send)(k, buf, len); }
00172 int my_output_clean(kafka_t* k) { PLUGIN_FUNCTION(output_clean)(k); }
00173 int my_output_update(kafka_t* k) { PLUGIN_FUNCTION(output_update)(k); }
00174 void output_destroy(void* untyped)
00175 {
00176     kafka_t* k = (kafka_t*) untyped;
00177     if(k == NULL) return;
00178     if(k->conf->quota_n > 0) time_queue_delete(k->conf->quota_queue);
00179     if(k->zhandle != NULL) zookeeper_close(k->zhandle);
00180     my_output_clean(k);
00181     free(k);
00182     dynamic_configuration_watch_stop();
00183 }
00184 void setup_from_dynamic_configuration(int argc, char** argv, void* context)
00185 {
00186     kafka_t* k = (kafka_t*) context;
00187     memset(k->conf, 0, sizeof(config));
00188     parse_arguments(argc, argv, k->conf);
00189     if(k->zhandle != NULL)
00190     {
00191         zookeeper_close(k->zhandle);
00192         k->zhandle = NULL;
00193     }
00194     my_output_update(k);
00195 }
00196 int my_output_setup(config* conf, void* k)
00197 {
00198     char* output = "kafka";
00199     if(conf->output_n > 0) output = conf->output[0];
00200     trace_debug("my_output_setup: loading output plugin %s", output);
00201     void* handle = load_plugin(OUTPUT_PLUGIN_PREFIX, output);
00202     trace_debug("my_output_setup: load_plugin result %x", handle);
00203     output_setup_t f = (output_setup_t) load_function_from_plugin(handle, "output_setup");
00204     PLUGIN_FUNCTION_LOAD(output_send)
00205     PLUGIN_FUNCTION_LOAD(output_update)
00206     PLUGIN_FUNCTION_LOAD(output_clean)
00207     if(f != NULL) return f(k, conf);
00208     trace_debug("my_output_setup: output_setup is NULL");
00209     return 1;
00210 }
00211 void* output_init(config* conf)
00212 {
00213     trace_debug("output_init: entry");
00214     if(conf == NULL) return NULL;
00215     fuse_get_context()->private_data = (void*) conf;
00216     dynamic_configuration_watch(&setup_from_dynamic_configuration);
00217     trace_debug("output_init: watching dynamic configuration");
00218     int directory_fd = conf->directory_fd;
00219     int time_queue_size;
00220     fchdir(directory_fd);
00221     close(directory_fd);
00222     kafka_t* k = (kafka_t*) malloc(sizeof(kafka_t));
00223     memset(k, 0, sizeof(kafka_t));
00224     trace_debug("output_init: calling my_output_setup");
00225     if(my_output_setup(conf, (void*) k))
00226     {
00227         printf("output_init: output_setup failed\n");
00228         return NULL;
00229     }
00230     k->conf = conf;
00231     if(conf->quota_n > 0)
00232     {
00233         time_queue_size = conf->quota_n > 1 ? atoi(conf->quota[1]):20;
00234         conf->quota_queue = time_queue_new(
00235                 time_queue_size, atoi(conf->quota[0]));
00236     }
00237     dynamic_configuration_get()->context = (void*) k;
00238     fuse_get_context()->private_data = (void*) k;
00239     return (void*) k;
00240 }
00241 void input_setup_internal(int argc, char** argv, void* conf)
00242 {
00243     fuse_get_context()->private_data = (void*) output_init((config*) conf);
00244 #ifndef TEST
00245     input_setup(argc, argv, conf);
00246 #endif
00247 }
 All Data Structures Files Functions Variables Defines