@@ -27,39 +27,39 @@ use std::fs;
27
27
use std:: io:: { BufReader , Cursor , Seek , SeekFrom , Write } ;
28
28
use std:: sync:: Arc ;
29
29
30
- use crate :: mem_store ;
30
+ use crate :: metadata ;
31
31
use crate :: response;
32
32
use crate :: storage;
33
33
use crate :: Error ;
34
34
35
- // Event holds all values relevant to a single event for a single logstream
35
+ // Event holds all values relevant to a single event for a single log stream
36
36
pub struct Event {
37
37
pub body : String ,
38
38
pub stream_name : String ,
39
39
pub path : String ,
40
40
pub schema : Bytes ,
41
41
}
42
42
43
- // Events holds the schema related to a each event for a single logstream
43
+ // Events holds the schema related to a each event for a single log stream
44
44
pub struct Schema {
45
45
pub arrow_schema : arrow:: datatypes:: Schema ,
46
46
pub string_schema : String ,
47
47
}
48
48
49
49
impl Event {
50
50
pub fn process ( & self ) -> Result < response:: EventResponse , Error > {
51
- // If the .schema file is still empty, this is the first event in this logstream .
51
+ // If the .schema file is still empty, this is the first event in this log stream .
52
52
if self . schema . is_empty ( ) {
53
- self . initial_event ( )
53
+ self . first_event ( )
54
54
} else {
55
- self . next_event ( )
55
+ self . event ( )
56
56
}
57
57
}
58
58
59
- // This is called when the first event of a LogStream is received. The first event is
60
- // special because we parse this event to generate the schema for the logstream . This
61
- // schema is then enforced on rest of the events sent to this logstream .
62
- fn initial_event ( & self ) -> Result < response:: EventResponse , Error > {
59
+ // This is called when the first event of a log stream is received. The first event is
60
+ // special because we parse this event to generate the schema for the log stream . This
61
+ // schema is then enforced on rest of the events sent to this log stream .
62
+ fn first_event ( & self ) -> Result < response:: EventResponse , Error > {
63
63
let mut c = Cursor :: new ( Vec :: new ( ) ) ;
64
64
let reader = self . body . as_bytes ( ) ;
65
65
@@ -75,39 +75,41 @@ impl Event {
75
75
) ;
76
76
let b1 = event. next ( ) ?. ok_or ( Error :: MissingRecord ) ?;
77
77
78
- // Put the event into in memory store
79
- mem_store:: MEM_STREAMS :: put (
80
- self . stream_name . to_string ( ) ,
81
- mem_store:: LogStream {
82
- schema : Some ( self . infer_schema ( ) . string_schema ) ,
83
- rb : Some ( b1. clone ( ) ) ,
84
- } ,
85
- ) ;
86
-
87
78
// Store record batch to Parquet file on local cache
88
79
self . convert_arrow_parquet ( b1) ;
89
80
90
81
// Put the inferred schema to object store
91
- storage:: put_schema ( & self . stream_name , self . infer_schema ( ) . string_schema ) . map_err ( |e| {
82
+ let schema = self . infer_schema ( ) . string_schema ;
83
+ let stream_name = & self . stream_name ;
84
+ storage:: put_schema ( stream_name. clone ( ) , schema. clone ( ) ) . map_err ( |e| {
92
85
Error :: Event ( response:: EventError {
93
86
msg : format ! (
94
- "Failed to upload schema for LogStream {} due to err: {}" ,
87
+ "Failed to upload schema for log stream {} due to err: {}" ,
95
88
self . stream_name, e
96
89
) ,
97
90
} )
98
91
} ) ?;
99
92
93
+ if let Err ( e) = metadata:: STREAM_INFO . set_schema ( stream_name. to_string ( ) , schema) {
94
+ return Err ( Error :: Event ( response:: EventError {
95
+ msg : format ! (
96
+ "Failed to set schema for log stream {} due to err: {}" ,
97
+ stream_name, e
98
+ ) ,
99
+ } ) ) ;
100
+ }
101
+
100
102
Ok ( response:: EventResponse {
101
103
msg : format ! (
102
- "Intial Event recieved for LogStream {}, schema uploaded successfully" ,
104
+ "Intial Event recieved for log stream {}, schema uploaded successfully" ,
103
105
self . stream_name
104
106
) ,
105
107
} )
106
108
}
107
109
108
- // next_event process all events after the 1st event. Concatenates record batches
110
+ // event process all events after the 1st event. Concatenates record batches
109
111
// and puts them in memory store for each event.
110
- fn next_event ( & self ) -> Result < response:: EventResponse , Error > {
112
+ fn event ( & self ) -> Result < response:: EventResponse , Error > {
111
113
let mut c = Cursor :: new ( Vec :: new ( ) ) ;
112
114
let reader = self . body . as_bytes ( ) ;
113
115
c. write_all ( reader) . unwrap ( ) ;
@@ -119,31 +121,22 @@ impl Event {
119
121
1024 ,
120
122
None ,
121
123
) ;
122
- let next_event_rb = event. next ( ) . unwrap ( ) . unwrap ( ) ;
124
+ let _next_event_rb = event. next ( ) . unwrap ( ) . unwrap ( ) ;
123
125
124
- let rb = mem_store:: MEM_STREAMS :: get_rb ( self . stream_name . clone ( ) ) ?;
126
+ // TODO -- Read existing data file and append the record and write it back
127
+ // let vec = vec![next_event_rb.clone(), rb];
128
+ // let new_batch = RecordBatch::concat(&next_event_rb.schema(), &vec);
125
129
126
- let vec = vec ! [ next_event_rb. clone( ) , rb] ;
127
- let new_batch = RecordBatch :: concat ( & next_event_rb. schema ( ) , & vec) ;
128
-
129
- let rb = new_batch. map_err ( |e| {
130
- Error :: Event ( response:: EventError {
131
- msg : format ! ( "Error recieved for LogStream {}, {}" , & self . stream_name, e) ,
132
- } )
133
- } ) ?;
134
-
135
- mem_store:: MEM_STREAMS :: put (
136
- self . stream_name . clone ( ) ,
137
- mem_store:: LogStream {
138
- schema : Some ( mem_store:: MEM_STREAMS :: get_schema ( self . stream_name . clone ( ) ) ) ,
139
- rb : Some ( rb. clone ( ) ) ,
140
- } ,
141
- ) ;
130
+ // let rb = new_batch.map_err(|e| {
131
+ // Error::Event(response::EventError {
132
+ // msg: format!("Error recieved for log stream {}, {}", &self.stream_name, e),
133
+ // })
134
+ // })?;
142
135
143
- self . convert_arrow_parquet ( rb) ;
136
+ // self.convert_arrow_parquet(rb);
144
137
145
138
Ok ( response:: EventResponse {
146
- msg : format ! ( "Event recieved for LogStream {}" , & self . stream_name) ,
139
+ msg : format ! ( "Event recieved for log stream {}" , & self . stream_name) ,
147
140
} )
148
141
}
149
142
0 commit comments