![]() |
fuse_kafka
|
00001 typedef struct _queue_event 00002 { 00003 char* prefix; 00004 char* path; 00005 char* buf; 00006 size_t size; 00007 off_t offset; 00008 struct _queue_event* previous; 00009 struct _queue_event* next; 00010 } queue_event; 00011 queue_event** event_last_get() 00012 { 00013 static queue_event* last = NULL; 00014 return &last; 00015 } 00016 int* event_queue_size() 00017 { 00018 static int n = 0; 00019 return &n; 00020 } 00021 int* event_queue_max_size() 00022 { 00023 static int n = 10000; 00024 return &n; 00025 } 00026 void events_drop_first() 00027 { 00028 queue_event* current = *event_last_get(); 00029 while(current != NULL) 00030 { 00031 if(current->previous != NULL && current->previous->previous == NULL) 00032 { 00033 free(current->previous); 00034 current->previous = NULL; 00035 (*event_queue_size())--; 00036 return; 00037 } 00038 current = current->previous; 00039 } 00040 } 00041 void event_enqueue(char *prefix, char *path, char *buf, 00042 size_t size, off_t offset) 00043 { 00044 queue_event** last = event_last_get(); 00045 queue_event* new = malloc(sizeof(queue_event)); 00046 new->prefix = strdup(prefix); 00047 new->path = strdup(path); 00048 new->buf = strdup(buf); 00049 new->size = size; 00050 new->offset = offset; 00051 new->previous = *last; 00052 if(new->previous != NULL) new->previous->next = new; 00053 new->next = NULL; 00054 *last = new; 00055 if((*event_queue_size())++ >= *(event_queue_max_size())) 00056 events_drop_first(); 00057 } 00058 void event_delete(queue_event* event) 00059 { 00060 free(event->prefix); 00061 free(event->path); 00062 free(event->buf); 00063 free(event); 00064 (*event_queue_size())--; 00065 } 00066 void events_queue_empty() 00067 { 00068 queue_event** last = event_last_get(); 00069 while(*last != NULL) 00070 { 00071 queue_event* to_free = *last; 00072 *last = (*last)->previous; 00073 event_delete(to_free); 00074 } 00075 } 00076 void events_dequeue(void (*writer)(const char *prefix, const char *path, char *buf, 00077 size_t size, off_t offset)) 00078 { 00079 queue_event* current = *event_last_get(); 00080 if(current == NULL) return; 00081 while(current->previous != NULL) current = current->previous; 00082 for(; current != NULL; current = current->next) 00083 { 00084 writer(current->prefix, current->path, 00085 current->buf, current->size, current->offset); 00086 } 00087 events_queue_empty(); 00088 }