Browse Source

Rewrite first part of the EntityStore into using proper async

master
Puck Meerburg 6 months ago
parent
commit
424a2dcc7e

+ 3
- 2
Cargo.toml View File

@@ -4,10 +4,11 @@ version = "0.1.0"
4 4
 authors = ["Puck Meerburg <puck@puckipedia.com>"]
5 5
 
6 6
 [dependencies]
7
-diesel = { version = "~1.2", features = ["postgres"] }
8 7
 jsonld = { path = "../jsonld-rs" }
9 8
 dotenv = "0.9.0"
10 9
 serde = "1.0"
11 10
 serde_json = "1.0"
12 11
 kroeg-tap = { path = "../tap/tap" }
13
-futures = "0.1"
12
+futures = "0.1"
13
+tokio-postgres = { git = "https://github.com/sfackler/rust-postgres" }
14
+futures-state-stream = "0.2"

+ 0
- 0
migrations/.gitkeep View File


+ 0
- 3
migrations/00000000000000_diesel_initial_setup/down.sql View File

@@ -1,6 +0,0 @@
1
-
2
-DROP FUNCTION IF EXISTS diesel_manage_updated_at(_tbl regclass);
3
-DROP FUNCTION IF EXISTS diesel_set_updated_at();

+ 0
- 25
migrations/00000000000000_diesel_initial_setup/up.sql View File

@@ -1,36 +0,0 @@
1
-
2
-
3
-
4
-
5
---
6
---
7
---
8
-CREATE OR REPLACE FUNCTION diesel_manage_updated_at(_tbl regclass) RETURNS VOID AS $$
9
-BEGIN
10
-    EXECUTE format('CREATE TRIGGER set_updated_at BEFORE UPDATE ON %s
11
-                    FOR EACH ROW EXECUTE PROCEDURE diesel_set_updated_at()', _tbl);
12
-END;
13
-$$ LANGUAGE plpgsql;
14
-
15
-CREATE OR REPLACE FUNCTION diesel_set_updated_at() RETURNS trigger AS $$
16
-BEGIN
17
-    IF (
18
-        NEW IS DISTINCT FROM OLD AND
19
-        NEW.updated_at IS NOT DISTINCT FROM OLD.updated_at
20
-    ) THEN
21
-        NEW.updated_at := current_timestamp;
22
-    END IF;
23
-    RETURN NEW;
24
-END;
25
-$$ LANGUAGE plpgsql;

+ 0
- 2
migrations/2018-05-14-135610_base_database/down.sql View File

@@ -1,2 +0,0 @@
1
-DROP TABLE quad;
2
-DROP TABLE attribute;

+ 0
- 23
migrations/2018-05-14-135610_base_database/up.sql View File

@@ -1,23 +0,0 @@
1
-CREATE TABLE attribute (
2
-    id SERIAL PRIMARY KEY,
3
-    url TEXT UNIQUE NOT NULL
4
-);
5
-
6
-CREATE INDEX attribute_url on attribute (url);
7
-
8
-CREATE TABLE quad (
9
-    id SERIAL PRIMARY KEY,
10
-    quad_id integer not null references attribute,
11
-    subject_id integer not null references attribute,
12
-    predicate_id integer not null references attribute,
13
-
14
-    -- either
15
-    attribute_id integer null references attribute,
16
-
17
-    -- or
18
-    object text null,
19
-    type_id integer null references attribute,
20
-    language char(2) null
21
-);
22
-
23
-CREATE INDEX quad_quad_id on quad (quad_id);

+ 0
- 1
migrations/2018-06-17-211047_add_collections/down.sql View File

@@ -1 +0,0 @@
1
-DROP TABLE collection_item;

+ 0
- 8
migrations/2018-06-17-211047_add_collections/up.sql View File

@@ -1,8 +0,0 @@
1
-CREATE TABLE collection_item (
2
-    id SERIAL PRIMARY KEY,
3
-
4
-    collection_id integer not null references attribute,
5
-    object_id integer not null references attribute
6
-);
7
-
8
-CREATE INDEX collection_item_collection on collection_item (collection_id);

+ 0
- 1
migrations/2018-07-17-192617_add_queue/down.sql View File

@@ -1 +0,0 @@
1
-drop table queue_item;

+ 0
- 5
migrations/2018-07-17-192617_add_queue/up.sql View File

@@ -1,5 +0,0 @@
1
-CREATE TABLE queue_item (
2
-    id SERIAL PRIMARY KEY,
3
-    event text not null,
4
-    data text not null
5
-);

+ 0
- 0
migrations/2018-09-19-151201_fix_language_column/down.sql View File

@@ -1,2 +0,0 @@

+ 0
- 2
migrations/2018-09-19-151201_fix_language_column/up.sql View File

@@ -1,4 +0,0 @@
1
-
2
-ALTER TABLE quad ALTER language TYPE text;

+ 0
- 1
migrations/2018-11-03-174949_fix-identical-rows/down.sql View File

@@ -1 +0,0 @@
1
-alter table collection_item drop constraint collection_items_unique;

+ 0
- 6
migrations/2018-11-03-174949_fix-identical-rows/up.sql View File

@@ -1,6 +0,0 @@
1
-
2
-
3
-delete from collection_item where collection_item.id not in
4
-    (select distinct on (collection_id, object_id) id from collection_item);
5
-
6
-alter table collection_item add constraint collection_items_unique unique (collection_id, object_id);

+ 38
- 0
schema/db.sql View File

@@ -0,0 +1,38 @@
1
+CREATE TABLE attribute (
2
+    id SERIAL PRIMARY KEY,
3
+    url TEXT UNIQUE NOT NULL
4
+);
5
+
6
+CREATE TABLE quad (
7
+    id SERIAL PRIMARY KEY,
8
+    quad_id integer not null references attribute,
9
+    subject_id integer not null references attribute,
10
+    predicate_id integer not null references attribute,
11
+
12
+    -- either
13
+    attribute_id integer null references attribute,
14
+
15
+    -- or
16
+    object text null,
17
+    type_id integer null references attribute,
18
+    language text null
19
+);
20
+
21
+CREATE TABLE collection_item (
22
+    id SERIAL PRIMARY KEY,
23
+
24
+    collection_id integer not null references attribute,
25
+    object_id integer not null references attribute,
26
+
27
+    constraint collection_items_unique unique (collection_id, object_id)
28
+);
29
+
30
+CREATE TABLE queue_item (
31
+    id SERIAL PRIMARY KEY,
32
+    event text not null,
33
+    data text not null
34
+);
35
+
36
+CREATE INDEX attribute_url on attribute (url);
37
+CREATE INDEX quad_quad_id on quad (quad_id);
38
+CREATE INDEX collection_item_collection on collection_item (collection_id);

+ 26
- 0
src/cache.rs View File

@@ -0,0 +1,26 @@
1
+use crate::DatabaseQuad;
2
+use jsonld::rdf::StringQuad;
3
+use kroeg_tap::StoreItem;
4
+use std::collections::HashMap;
5
+use tokio_postgres::Row;
6
+
7
+pub struct EntityCache {
8
+    pub id_to_uri: HashMap<i32, String>,
9
+    pub uri_to_id: HashMap<String, i32>,
10
+
11
+    pub object: HashMap<String, StoreItem>,
12
+}
13
+
14
+impl EntityCache {
15
+    pub fn cache_attribute_row(&mut self, row: Row) {
16
+        let id: i32 = row.get(0);
17
+        let uri: String = row.get(1);
18
+
19
+        self.id_to_uri.insert(id, uri.to_owned());
20
+        self.uri_to_id.insert(uri, id);
21
+    }
22
+
23
+    pub fn translate_quad(&self, quad: DatabaseQuad) -> StringQuad {
24
+        unimplemented!();
25
+    }
26
+}

+ 0
- 565
src/entitystore.rs View File

@@ -1,565 +0,0 @@
1
-use super::QuadClient;
2
-
3
-use diesel::expression::dsl::sql;
4
-use diesel::pg::types::sql_types::Array;
5
-use diesel::prelude::*;
6
-use diesel::result::Error;
7
-use diesel::sql_types::Integer;
8
-use diesel::{delete, insert_into};
9
-
10
-use futures::future;
11
-use futures::prelude::*;
12
-
13
-use std::collections::{BTreeMap, HashMap};
14
-
15
-use jsonld::rdf::{jsonld_to_rdf, rdf_to_jsonld};
16
-use kroeg_tap::{
17
-    CollectionPointer, EntityStore, QuadQuery, QueryId, QueryObject, QueueItem, QueueStore,
18
-    StoreItem, StoreItemNodeGenerator,
19
-};
20
-use serde_json::Value as JValue;
21
-
22
-use super::models;
23
-
24
-impl QueueItem for models::QueueItem {
25
-    fn event(&self) -> &str {
26
-        &self.event
27
-    }
28
-
29
-    fn data(&self) -> &str {
30
-        &self.data
31
-    }
32
-}
33
-
34
-impl QueueStore for QuadClient {
35
-    type Item = models::QueueItem;
36
-    type Error = Error;
37
-    type GetItemFuture =
38
-        Box<Future<Item = (Option<Self::Item>, Self), Error = (Self::Error, Self)> + Send>;
39
-    type MarkFuture = Box<Future<Item = Self, Error = (Self::Error, Self)> + Send>;
40
-
41
-    fn get_item(self) -> Self::GetItemFuture {
42
-        use models::QueueItem;
43
-        use schema::queue_item::dsl::*;
44
-
45
-        match queue_item
46
-            .order(id)
47
-            .limit(1)
48
-            .get_result::<QueueItem>(&self.connection)
49
-            .optional()
50
-        {
51
-            Ok(Some(val)) => {
52
-                match delete(queue_item.filter(id.eq(val.id))).execute(&self.connection) {
53
-                    Ok(0) => Box::new(future::ok((None, self))),
54
-                    Ok(_) => Box::new(future::ok((Some(val), self))),
55
-                    Err(e) => return Box::new(future::err((e, self))),
56
-                }
57
-            }
58
-            Ok(None) => Box::new(future::ok((None, self))),
59
-            Err(e) => Box::new(future::err((e, self))),
60
-        }
61
-    }
62
-
63
-    fn mark_success(self, _item: models::QueueItem) -> Self::MarkFuture {
64
-        Box::new(future::ok(self))
65
-    }
66
-
67
-    fn mark_failure(self, item: models::QueueItem) -> Self::MarkFuture {
68
-        use models::InsertableQueueItem;
69
-        use schema::queue_item::dsl::*;
70
-
71
-        match insert_into(queue_item)
72
-            .values(&InsertableQueueItem {
73
-                event: item.event,
74
-                data: item.data,
75
-            })
76
-            .execute(&self.connection)
77
-        {
78
-            Ok(_) => Box::new(future::ok(self)),
79
-            Err(e) => Box::new(future::err((e, self))),
80
-        }
81
-    }
82
-
83
-    fn add(self, event: String, data: String) -> Self::MarkFuture {
84
-        use models::InsertableQueueItem;
85
-        use schema::queue_item::dsl::queue_item;
86
-
87
-        match insert_into(queue_item)
88
-            .values(&InsertableQueueItem {
89
-                event: event,
90
-                data: data,
91
-            })
92
-            .execute(&self.connection)
93
-        {
94
-            Ok(_) => Box::new(future::ok(self)),
95
-            Err(e) => Box::new(future::err((e, self))),
96
-        }
97
-    }
98
-}
99
-
100
-impl EntityStore for QuadClient {
101
-    type Error = Error;
102
-    type GetFuture =
103
-        Box<Future<Item = (Option<StoreItem>, Self), Error = (Self::Error, Self)> + Send>;
104
-    type StoreFuture = Box<Future<Item = (StoreItem, Self), Error = (Self::Error, Self)> + Send>;
105
-
106
-    type QueryFuture = future::FutureResult<(Vec<Vec<String>>, Self), (Self::Error, Self)>;
107
-
108
-    type ReadCollectionFuture =
109
-        future::FutureResult<(CollectionPointer, Self), (Self::Error, Self)>;
110
-    type WriteCollectionFuture = future::FutureResult<Self, (Self::Error, Self)>;
111
-
112
-    fn get(mut self, path: String, _local: bool) -> Self::GetFuture {
113
-        if self.cache.contains_key(&path) {
114
-            Box::new(future::ok((self.cache[&path].clone(), self)))
115
-        } else {
116
-            let quads = match self.read_quads(&path) {
117
-                Ok(quads) => quads,
118
-                Err(err) => return Box::new(future::err((err, self))),
119
-            };
120
-
121
-            if quads.len() == 0 {
122
-                Box::new(future::ok((None, self)))
123
-            } else {
124
-                let mut hash = HashMap::new();
125
-                hash.insert("@default".to_owned(), quads);
126
-                match rdf_to_jsonld(hash, true, false) {
127
-                    JValue::Object(jval) => {
128
-                        let jval = JValue::Array(jval.into_iter().map(|(_, b)| b).collect());
129
-                        Box::new(future::ok((
130
-                            Some(StoreItem::parse(&path, jval).unwrap()),
131
-                            self,
132
-                        )))
133
-                    }
134
-                    _ => unreachable!(),
135
-                }
136
-            }
137
-        }
138
-    }
139
-
140
-    fn put(mut self, path: String, item: StoreItem) -> Self::StoreFuture {
141
-        self.cache.remove(&path);
142
-
143
-        let jld = item.to_json();
144
-
145
-        let rdf = match jsonld_to_rdf(jld, &mut StoreItemNodeGenerator::new()) {
146
-            Ok(rdf) => rdf,
147
-            Err(err) => panic!("welp {}", err),
148
-        };
149
-
150
-        let quads = rdf.clone().remove("@default").unwrap();
151
-        if let Err(err) = self.write_quads(&path, quads) {
152
-            return Box::new(future::err((err, self)));
153
-        }
154
-
155
-        Box::new(future::ok((
156
-            StoreItem::parse(&path, rdf_to_jsonld(rdf, true, false)).unwrap(),
157
-            self,
158
-        )))
159
-    }
160
-
161
-    fn read_collection(
162
-        mut self,
163
-        path: String,
164
-        count: Option<u32>,
165
-        cursor: Option<String>,
166
-    ) -> Self::ReadCollectionFuture {
167
-        let path_id = match self.get_attribute_id(&path) {
168
-            Ok(ok) => ok,
169
-            Err(err) => return future::err((err, self)),
170
-        };
171
-
172
-        let mut result = CollectionPointer {
173
-            items: Vec::new(),
174
-            before: None,
175
-            after: None,
176
-            count: None,
177
-        };
178
-
179
-        // !!!! before and after are as in previous and next page respectively!!
180
-        let mut before = i32::min_value();
181
-        let mut after = i32::max_value();
182
-        if let Some(cursor) = cursor {
183
-            let spl: Vec<_> = cursor.split('-').collect();
184
-            if spl.len() == 2 {
185
-                if let Some(val) = spl[1].parse::<i32>().ok() {
186
-                    if spl[0] == "before" {
187
-                        before = val;
188
-                    } else if spl[0] == "after" {
189
-                        after = val;
190
-                    }
191
-                }
192
-            }
193
-        }
194
-
195
-        use models::CollectionItem;
196
-        use schema::collection_item::dsl::*;
197
-
198
-        let count = count.unwrap_or(50u32);
199
-
200
-        let mut items: Vec<CollectionItem> = match if before == i32::min_value() {
201
-            collection_item
202
-                .filter(collection_id.eq(path_id))
203
-                .filter(id.lt(after).and(id.gt(before)))
204
-                .order(id.desc())
205
-                .limit(count as i64)
206
-                .load(&self.connection)
207
-        } else {
208
-            collection_item
209
-                .filter(collection_id.eq(path_id))
210
-                .filter(id.lt(after).and(id.gt(before)))
211
-                .order(id.asc())
212
-                .limit(count as i64)
213
-                .load(&self.connection)
214
-        } {
215
-            Ok(ok) => ok,
216
-            Err(err) => return future::err((err, self)),
217
-        };
218
-
219
-        items.sort_by_key(|f| -f.id);
220
-
221
-        if before != i32::min_value() {
222
-            result.after = Some(format!("after-{}", before));
223
-        }
224
-
225
-        if after != i32::max_value() {
226
-            result.before = Some(format!("before-{}", after));
227
-        }
228
-
229
-        if items.len() > 0 {
230
-            result.before = Some(format!("before-{}", items[0].id));
231
-            if items.len() == count as usize {
232
-                result.after = Some(format!("after-{}", items[(count - 1) as usize].id));
233
-            }
234
-        }
235
-
236
-        let ids = items.into_iter().map(|f| f.object_id).collect();
237
-        match self.get_attributes(&ids) {
238
-            Ok(_) => (),
239
-            Err(err) => return future::err((err, self)),
240
-        };
241
-
242
-        result.items = ids
243
-            .into_iter()
244
-            .map(|f| self.attribute_url[&f].clone())
245
-            .collect();
246
-
247
-        future::ok((result, self))
248
-    }
249
-
250
-    fn read_collection_inverse(mut self, item: String) -> Self::ReadCollectionFuture {
251
-        let item_id = match self.get_attribute_id(&item) {
252
-            Ok(ok) => ok,
253
-            Err(err) => return future::err((err, self)),
254
-        };
255
-
256
-        let mut result = CollectionPointer {
257
-            items: Vec::new(),
258
-            before: None,
259
-            after: None,
260
-            count: None,
261
-        };
262
-
263
-        use models::CollectionItem;
264
-        use schema::collection_item::dsl::*;
265
-
266
-        let items: Vec<CollectionItem> = match collection_item
267
-            .filter(object_id.eq(item_id))
268
-            .load(&self.connection)
269
-        {
270
-            Ok(ok) => ok,
271
-            Err(err) => return future::err((err, self)),
272
-        };
273
-
274
-        let ids = items.into_iter().map(|f| f.collection_id).collect();
275
-        match self.get_attributes(&ids) {
276
-            Ok(_) => (),
277
-            Err(err) => return future::err((err, self)),
278
-        };
279
-
280
-        result.items = ids
281
-            .into_iter()
282
-            .map(|f| self.attribute_url[&f].clone())
283
-            .collect();
284
-
285
-        future::ok((result, self))
286
-    }
287
-
288
-    fn find_collection(mut self, path: String, item: String) -> Self::ReadCollectionFuture {
289
-        use models::CollectionItem;
290
-        use schema::collection_item::dsl::*;
291
-
292
-        let path_id = match self.get_attribute_id(&path) {
293
-            Ok(ok) => ok,
294
-            Err(err) => return future::err((err, self)),
295
-        };
296
-
297
-        let item_id = match self.get_attribute_id(&item) {
298
-            Ok(ok) => ok,
299
-            Err(err) => return future::err((err, self)),
300
-        };
301
-
302
-        let items = collection_item
303
-            .filter(collection_id.eq(path_id).and(object_id.eq(item_id)))
304
-            .load(&self.connection);
305
-
306
-        let items: Vec<CollectionItem> = match items {
307
-            Ok(items) => items,
308
-            Err(e) => return future::err((e, self)),
309
-        };
310
-
311
-        if items.len() != 0 {
312
-            future::ok((
313
-                CollectionPointer {
314
-                    items: vec![item],
315
-                    before: Some(format!("before-{}", items[0].id)),
316
-                    after: Some(format!("after-{}", items[0].id)),
317
-                    count: None,
318
-                },
319
-                self,
320
-            ))
321
-        } else {
322
-            future::ok((
323
-                CollectionPointer {
324
-                    items: vec![],
325
-                    before: None,
326
-                    after: None,
327
-                    count: None,
328
-                },
329
-                self,
330
-            ))
331
-        }
332
-    }
333
-
334
-    fn insert_collection(mut self, path: String, item: String) -> Self::WriteCollectionFuture {
335
-        use models::InsertableCollectionItem;
336
-        use schema::collection_item::dsl::*;
337
-
338
-        let path_id = match self.get_attribute_id(&path) {
339
-            Ok(ok) => ok,
340
-            Err(err) => return future::err((err, self)),
341
-        };
342
-
343
-        let item_id = match self.get_attribute_id(&item) {
344
-            Ok(ok) => ok,
345
-            Err(err) => return future::err((err, self)),
346
-        };
347
-
348
-        if let Err(err) = insert_into(collection_item)
349
-            .values(&InsertableCollectionItem {
350
-                collection_id: path_id,
351
-                object_id: item_id,
352
-            })
353
-            .on_conflict((collection_id, object_id))
354
-            .do_nothing()
355
-            .execute(&self.connection)
356
-        {
357
-            future::err((err, self))
358
-        } else {
359
-            future::ok(self)
360
-        }
361
-    }
362
-
363
-    fn remove_collection(mut self, path: String, item: String) -> Self::WriteCollectionFuture {
364
-        use schema::collection_item::dsl::*;
365
-
366
-        let path_id = match self.get_attribute_id(&path) {
367
-            Ok(ok) => ok,
368
-            Err(err) => return future::err((err, self)),
369
-        };
370
-
371
-        let item_id = match self.get_attribute_id(&item) {
372
-            Ok(ok) => ok,
373
-            Err(err) => return future::err((err, self)),
374
-        };
375
-
376
-        if let Err(err) =
377
-            delete(collection_item.filter(collection_id.eq(path_id).and(object_id.eq(item_id))))
378
-                .execute(&self.connection)
379
-        {
380
-            future::err((err, self))
381
-        } else {
382
-            future::ok(self)
383
-        }
384
-    }
385
-
386
-    fn query(mut self, data: Vec<QuadQuery>) -> Self::QueryFuture {
387
-        let mut placeholders = BTreeMap::new();
388
-        let mut checks = HashMap::new();
389
-        let mut checks_any = HashMap::new();
390
-        let mut others = Vec::new();
391
-        let quad_count = data.len();
392
-
393
-        for (i, QuadQuery(subject, predicate, object)) in data.into_iter().enumerate() {
394
-            match subject {
395
-                QueryId::Value(val) => {
396
-                    checks.insert(format!("quad_{}.quad_id", i), val);
397
-                }
398
-                QueryId::Placeholder(val) => {
399
-                    if !placeholders.contains_key(&val) {
400
-                        placeholders.insert(val, vec![format!("quad_{}.quad_id", i)]);
401
-                    } else {
402
-                        placeholders
403
-                            .get_mut(&val)
404
-                            .unwrap()
405
-                            .push(format!("quad_{}.quad_id", i));
406
-                    }
407
-                }
408
-                QueryId::Any(any) => {
409
-                    checks_any.insert(format!("quad_{}.quad_id", i), any);
410
-                }
411
-                QueryId::Ignore => {}
412
-            }
413
-            match predicate {
414
-                QueryId::Value(val) => {
415
-                    checks.insert(format!("quad_{}.predicate_id", i), val);
416
-                }
417
-                QueryId::Placeholder(val) => {
418
-                    if !placeholders.contains_key(&val) {
419
-                        placeholders.insert(val, vec![format!("quad_{}.predicate_id", i)]);
420
-                    } else {
421
-                        placeholders
422
-                            .get_mut(&val)
423
-                            .unwrap()
424
-                            .push(format!("quad_{}.predicate_id", i));
425
-                    }
426
-                }
427
-                QueryId::Any(any) => {
428
-                    checks_any.insert(format!("quad_{}.predicate_id", i), any);
429
-                }
430
-                QueryId::Ignore => {}
431
-            }
432
-
433
-            match object {
434
-                QueryObject::Id(QueryId::Value(val)) => {
435
-                    checks.insert(format!("quad_{}.attribute_id", i), val);
436
-                }
437
-                QueryObject::Id(QueryId::Placeholder(val)) => {
438
-                    if !placeholders.contains_key(&val) {
439
-                        placeholders.insert(val, vec![format!("quad_{}.attribute_id", i)]);
440
-                    } else {
441
-                        placeholders
442
-                            .get_mut(&val)
443
-                            .unwrap()
444
-                            .push(format!("quad_{}.attribute_id", i));
445
-                    }
446
-                }
447
-
448
-                QueryObject::Id(QueryId::Any(any)) => {
449
-                    checks_any.insert(format!("quad_{}.attribute_id", i), any);
450
-                }
451
-
452
-                QueryObject::Id(QueryId::Ignore) => {}
453
-                QueryObject::Object { value, type_id } => {
454
-                    others.push((format!("quad_{}.object", i), value));
455
-                    match type_id {
456
-                        QueryId::Value(val) => {
457
-                            checks.insert(format!("quad_{}.type_id", i), val);
458
-                        }
459
-                        QueryId::Placeholder(val) => {
460
-                            if !placeholders.contains_key(&val) {
461
-                                placeholders.insert(val, vec![format!("quad_{}.type_id", i)]);
462
-                            } else {
463
-                                placeholders
464
-                                    .get_mut(&val)
465
-                                    .unwrap()
466
-                                    .push(format!("quad_{}.type_id", i));
467
-                            }
468
-                        }
469
-                        QueryId::Any(any) => {
470
-                            checks_any.insert(format!("quad_{}.type_id", i), any);
471
-                        }
472
-                        QueryId::Ignore => {}
473
-                    }
474
-                }
475
-                QueryObject::LanguageString { value, language } => {
476
-                    others.push((format!("quad_{}.object", i), value.to_owned()));
477
-                    others.push((format!("quad_{}.language", i), language));
478
-                }
479
-            }
480
-        }
481
-
482
-        let mut query = String::from("select array[");
483
-        for (i, (_, placeholder)) in placeholders.iter().enumerate() {
484
-            if i != 0 {
485
-                query += ", "
486
-            }
487
-
488
-            query += &placeholder[0];
489
-        }
490
-
491
-        query += "] from ";
492
-
493
-        for i in 0..quad_count {
494
-            if i != 0 {
495
-                query += ", ";
496
-            }
497
-
498
-            query += &format!("quad quad_{}", i);
499
-        }
500
-
501
-        query += " where true ";
502
-        for (a, b) in others {
503
-            query += &format!("and {} = '{}' ", a, b.replace("'", "''"));
504
-        }
505
-
506
-        for (_, placeholder) in placeholders {
507
-            for (a, b) in placeholder.iter().zip(placeholder.iter().skip(1)) {
508
-                query += &format!("and {} = {} ", a, b);
509
-            }
510
-        }
511
-
512
-        {
513
-            let checks_iter = checks.iter().map(|(_, b)| b as &str).chain(
514
-                checks_any
515
-                    .iter()
516
-                    .flat_map(|(_, b)| b.iter().map(|f| f as &str)),
517
-            );
518
-
519
-            if let Err(e) = self.store_attributes(&checks_iter.collect()) {
520
-                return future::err((e, self));
521
-            }
522
-        }
523
-
524
-        for (a, b) in checks {
525
-            let against = self.attribute_id[&b];
526
-
527
-            query += &format!(" and {} = {}", a, against);
528
-        }
529
-
530
-        for (a, b) in checks_any {
531
-            if b.len() == 0 {
532
-                return future::ok((vec![], self));
533
-            }
534
-
535
-            query += &format!(
536
-                " and {} in ({})",
537
-                a,
538
-                b.into_iter()
539
-                    .map(|f| self.attribute_id[&f].to_string())
540
-                    .collect::<Vec<_>>()
541
-                    .join(", ")
542
-            );
543
-        }
544
-
545
-        let data: Vec<Vec<i32>> = match sql::<Array<Integer>>(&query).load(&self.connection) {
546
-            Ok(data) => data,
547
-            Err(e) => return future::err((e, self)),
548
-        };
549
-
550
-        if let Err(e) = self.get_attributes(&data.iter().flatten().map(|f| *f).collect()) {
551
-            return future::err((e, self));
552
-        }
553
-
554
-        future::ok((
555
-            data.into_iter()
556
-                .map(|f| {
557
-                    f.into_iter()
558
-                        .map(|f| self.attribute_url[&f].to_owned())
559
-                        .collect()
560
-                })
561
-                .collect(),
562
-            self,
563
-        ))
564
-    }
565
-}

+ 156
- 265
src/lib.rs View File

@@ -1,309 +1,200 @@
1
-#![allow(proc_macro_derive_resolution_fallback)]
2
-
3
-#[macro_use]
4
-extern crate diesel;
5 1
 extern crate futures;
2
+extern crate futures_state_stream;
6 3
 extern crate jsonld;
7 4
 extern crate kroeg_tap;
8 5
 extern crate serde_json;
6
+extern crate tokio_postgres;
9 7
 
10
-mod entitystore;
11
-mod models;
12
-mod schema;
8
+use futures::{future, stream, Future, Stream};
9
+use futures_state_stream::{StateStream, StreamEvent};
10
+use jsonld::rdf::StringQuad;
11
+use std::collections::HashSet;
12
+use tokio_postgres::{error::Error, Client, Connection, Row, Statement};
13 13
 
14
-use diesel::dsl::any;
15
-use diesel::pg::upsert::excluded;
16
-use diesel::pg::PgConnection;
17
-use diesel::prelude::*;
18
-use diesel::sql_query;
19
-use jsonld::rdf::{QuadContents, StringQuad};
20
-use kroeg_tap::StoreItem;
21
-use std::collections::{HashMap, HashSet};
22
-use std::fmt;
14
+mod statements;
15
+pub use statements::*;
23 16
 
24
-/// A client that talks to a database to store triples keyed by quad.
25
-pub struct QuadClient {
26
-    connection: PgConnection,
27
-    attribute_id: HashMap<String, i32>,
28
-    attribute_url: HashMap<i32, String>,
29
-    cache: HashMap<String, Option<StoreItem>>,
30
-    in_transaction: bool,
31
-}
17
+mod cache;
18
+pub use cache::*;
32 19
 
33
-impl fmt::Debug for QuadClient {
34
-    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
35
-        write!(f, "QuadClient {{ [...] }}")
36
-    }
20
+struct CellarEntityStore {
21
+    connection: Client,
22
+    statements: Statements,
23
+    cache: EntityCache,
37 24
 }
38 25
 
39
-impl QuadClient {
40
-    /// Gets a handle to the underlying connection.
41
-    pub fn connection(&self) -> &PgConnection {
42
-        &self.connection
43
-    }
44
-
45
-    /// Creates a new `QuadClient` from a `PgConnnection`. Multiple clients
46
-    /// can exist safely on one DB without interfering.
47
-    pub fn new(connection: PgConnection) -> QuadClient {
48
-        QuadClient {
49
-            connection: connection,
50
-            attribute_id: HashMap::new(),
51
-            attribute_url: HashMap::new(),
52
-            cache: HashMap::new(),
53
-            in_transaction: false,
54
-        }
55
-    }
56
-
57
-    /// Takes a list of attributes and caches their contents into this client.
58
-    fn process_attributes(&mut self, vals: &Vec<models::Attribute>) {
59
-        for val in vals {
60
-            self.attribute_id.insert(val.url.to_owned(), val.id);
61
-            self.attribute_url.insert(val.id, val.url.to_owned());
62
-        }
63
-    }
26
+pub enum DatabaseQuadContents {
27
+    Id(i32),
28
+    Object { contents: String, type_id: i32 },
29
+    LanguageString { contents: String, language: String },
30
+}
64 31
 
65
-    /// Takes a `Vec` of IRIs, and stores them into the DB where needed,
66
-    /// caching them.
67
-    fn store_attributes(&mut self, vals: &Vec<&str>) -> Result<(), diesel::result::Error> {
68
-        use models::NewAttribute;
69
-        use schema::attribute::dsl::*;
32
+pub struct DatabaseQuad {
33
+    pub id: i32,
34
+    pub quad_id: i32,
35
+    pub subject_id: i32,
36
+    pub predicate_id: i32,
37
+    pub contents: DatabaseQuadContents,
38
+}
70 39
 
71
-        let to_write: Vec<_> = {
72
-            vals.iter()
73
-                .filter(|f| !self.attribute_id.contains_key(**f))
74
-                .map(|f| NewAttribute { url: f })
75
-                .collect()
40
+impl From<&Row> for DatabaseQuad {
41
+    fn from(row: &Row) -> DatabaseQuad {
42
+        let contents = match (row.get(4), row.get(5), row.get(6), row.get(7)) {
43
+            (Some(id), _, _, _) => DatabaseQuadContents::Id(id),
44
+            (_, Some(contents), _, Some(language)) => DatabaseQuadContents::LanguageString {
45
+                contents: contents,
46
+                language: language,
47
+            },
48
+            (_, Some(contents), Some(type_id), _) => DatabaseQuadContents::Object {
49
+                contents: contents,
50
+                type_id: type_id,
51
+            },
52
+            _ => panic!("invalid quad contents; impossible"),
76 53
         };
77 54
 
78
-        if to_write.len() == 0 {
79
-            return Ok(());
55
+        DatabaseQuad {
56
+            id: row.get(0),
57
+            quad_id: row.get(1),
58
+            subject_id: row.get(2),
59
+            predicate_id: row.get(3),
60
+            contents: contents,
80 61
         }
81
-
82
-        let attribute_results = diesel::insert_into(attribute)
83
-            .values(&to_write)
84
-            .on_conflict(url)
85
-            .do_update()
86
-            .set(url.eq(excluded(url)))
87
-            .load(&self.connection)?;
88
-
89
-        self.process_attributes(&attribute_results);
90
-
91
-        Ok(())
92
-    }
93
-
94
-    /// Takes a `Vec` of database IDs and caches them into the local client.
95
-    fn get_attributes(&mut self, vals: &Vec<i32>) -> Result<(), diesel::result::Error> {
96
-        use schema::attribute::dsl::*;
97
-
98
-        let to_read: Vec<_> = {
99
-            vals.iter()
100
-                .filter(|f| !self.attribute_url.contains_key(f))
101
-                .collect()
102
-        };
103
-        if to_read.len() == 0 {
104
-            return Ok(());
105
-        }
106
-
107
-        let attribute_results = attribute
108
-            .filter(id.eq(any(to_read)))
109
-            .load(&self.connection)?;
110
-
111
-        self.process_attributes(&attribute_results);
112
-
113
-        Ok(())
114
-    }
115
-
116
-    /// Loads ALL the attributes into memory.
117
-    pub fn preload_all(&mut self) -> Result<usize, diesel::result::Error> {
118
-        use schema::attribute::dsl::*;
119
-
120
-        let attribute_results = attribute.load(&self.connection)?;
121
-
122
-        self.process_attributes(&attribute_results);
123
-
124
-        Ok(self.attribute_id.len())
125
-    }
126
-
127
-    /// Function that returns a bunch of unique quad IDs. WILL PANIC if you do not preload all first.
128
-    pub fn get_quads(&mut self, after: u32) -> Result<(Vec<String>, u32), diesel::result::Error> {
129
-        use diesel::expression::dsl::sql;
130
-        use diesel::sql_types::Integer;
131
-
132
-        let ids: Vec<i32> = sql::<Integer>(&format!("select distinct on (quad_id) quad_id from quad where quad_id > {} order by quad_id limit 3000", after)).load(&self.connection)?;
133
-        let last_id = ids
134
-            .iter()
135
-            .last()
136
-            .map(|a| *a as u32)
137
-            .unwrap_or(u32::max_value());
138
-
139
-        Ok((
140
-            ids.into_iter()
141
-                .map(|a| self.attribute_url[&a].to_owned())
142
-                .collect(),
143
-            last_id,
144
-        ))
145
-    }
146
-
147
-    /// Gets a single attribute IRI from a database ID.
148
-    pub fn get_attribute_url(&mut self, value: i32) -> Result<String, diesel::result::Error> {
149
-        self.get_attributes(&vec![value])?;
150
-
151
-        Ok(self.attribute_url[&value].to_owned())
152 62
     }
63
+}
153 64
 
154
-    /// Gets a single database ID from an attribute IRI.
155
-    pub fn get_attribute_id(&mut self, value: &str) -> Result<i32, diesel::result::Error> {
156
-        self.store_attributes(&vec![value])?;
65
+impl CellarEntityStore {
66
+    /// Helper method to split up the entity store into its parts.
67
+    fn unwrap(self) -> (Client, Statements, EntityCache) {
68
+        (self.connection, self.statements, self.cache)
69
+    }
70
+
71
+    /// Takes a StateStream of Rows and stores them into the cache.
72
+    fn cache_attribute_rows<T: Stream<Item = Row, Error = Error>>(
73
+        conn: Client,
74
+        cache: EntityCache,
75
+        statements: Statements,
76
+        stream: T,
77
+    ) -> impl Future<Item = CellarEntityStore, Error = (Error, CellarEntityStore)> {
78
+        stream.collect().then(move |future| {
79
+            let mut store = CellarEntityStore {
80
+                connection: conn,
81
+                statements: statements,
82
+                cache: cache,
83
+            };
157 84
 
158
-        Ok(self.attribute_id[value].to_owned())
159
-    }
85
+            match future {
86
+                Ok(rows) => {
87
+                    for row in rows {
88
+                        store.cache.cache_attribute_row(row);
89
+                    }
160 90
 
161
-    /// Takes a `Vec<Quad>` and ensures that all the database IDs that are used
162
-    /// will be cached.
163
-    fn preload_quads(&mut self, quads: &Vec<models::Quad>) -> Result<(), diesel::result::Error> {
164
-        let mut required_ids = HashSet::new();
91
+                    future::ok(store)
92
+                }
165 93
 
166
-        for quad in quads {
167
-            required_ids.insert(quad.subject_id);
168
-            required_ids.insert(quad.predicate_id);
169
-
170
-            if let Some(qval) = quad.attribute_id {
171
-                required_ids.insert(qval);
94
+                Err(e) => future::err((e, store)),
172 95
             }
96
+        })
97
+    }
98
+
99
+    /// Returns a list of quads built from a StateStream.
100
+    fn translate_quad_stream<T: Stream<Item = Row, Error = Error>>(
101
+        conn: Client,
102
+        cache: EntityCache,
103
+        statements: Statements,
104
+        stream: T,
105
+    ) -> impl Future<Item = (Vec<DatabaseQuad>, CellarEntityStore), Error = (Error, CellarEntityStore)>
106
+    {
107
+        stream.map(|f| (&f).into()).collect().then(move |future| {
108
+            let store = CellarEntityStore {
109
+                connection: conn,
110
+                statements: statements,
111
+                cache: cache,
112
+            };
173 113
 
174
-            if let Some(qval) = quad.type_id {
175
-                required_ids.insert(qval);
114
+            match future {
115
+                Ok(items) => future::ok((items, store)),
116
+                Err(e) => future::err((e, store)),
176 117
             }
177
-        }
178
-
179
-        self.get_attributes(&(required_ids.into_iter().collect()))
118
+        })
180 119
     }
181 120
 
182
-    /// Translates a single DB Quad into a `StringQuad`
183
-    fn read_quad(&mut self, quad: &models::Quad) -> StringQuad {
184
-        let contents = if let Some(attribute_id) = quad.attribute_id {
185
-            QuadContents::Id(self.attribute_url[&attribute_id].to_owned())
186
-        } else if let Some(type_id) = quad.type_id {
187
-            QuadContents::Object(
188
-                self.attribute_url[&type_id].to_owned(),
189
-                quad.object.as_ref().unwrap().to_owned(),
190
-                quad.language.as_ref().map(|f| f.to_owned()),
191
-            )
192
-        } else {
193
-            QuadContents::Object(
194
-                "http://www.w3.org/2001/XMLSchema#string".to_owned(),
195
-                quad.object.as_ref().unwrap().to_owned(),
196
-                quad.language.as_ref().map(|f| f.to_owned()),
197
-            )
198
-        };
121
+    /// Takes a slice of Strings, queries them into the database, then stores them into the cache
122
+    pub fn cache_uris(
123
+        self,
124
+        uris: &[String],
125
+    ) -> impl Future<Item = CellarEntityStore, Error = (Error, CellarEntityStore)> {
126
+        // XXX todo: 0 item optimization
127
+        let (mut connection, statements, cache) = self.unwrap();
128
+        let uncached: Vec<_> = uris
129
+            .iter()
130
+            .filter(|&f| !cache.uri_to_id.contains_key(f))
131
+            .collect();
132
+        let query = connection.query(&statements.upsert_attributes, &[&uncached]);
199 133
 
200
-        StringQuad {
201
-            subject_id: self.attribute_url[&quad.subject_id].to_owned(),
202
-            predicate_id: self.attribute_url[&quad.predicate_id].to_owned(),
203
-            contents: contents,
204
-        }
134
+        CellarEntityStore::cache_attribute_rows(connection, cache, statements, query)
205 135
     }
206 136
 
207
-    /// Reads a list of triples from the database, using a graph ID as key.
208
-    pub fn read_quads(&mut self, quadid: &str) -> Result<Vec<StringQuad>, diesel::result::Error> {
209
-        let quadid = self.get_attribute_id(quadid)?;
210
-
211
-        use schema::quad::dsl::*;
137
+    /// Takes a slice of IDs, queries them from the database, and stores them into the cache.
138
+    pub fn cache_ids(
139
+        self,
140
+        ids: &[i32],
141
+    ) -> impl Future<Item = CellarEntityStore, Error = (Error, CellarEntityStore)> {
142
+        let (mut connection, statements, cache) = self.unwrap();
143
+        let uncached: Vec<_> = ids
144
+            .iter()
145
+            .filter(|f| !cache.id_to_uri.contains_key(f))
146
+            .collect();
147
+        let query = connection.query(&statements.select_attributes, &[&uncached]);
212 148
 
213
-        let quads: Vec<models::Quad> = quad.filter(quad_id.eq(quadid)).load(&self.connection)?;
149
+        CellarEntityStore::cache_attribute_rows(connection, cache, statements, query)
150
+    }
214 151
 
215
-        self.preload_quads(&quads)?;
152
+    /// Reads all the quads stored for a specific quad ID.
153
+    fn read_quad(
154
+        self,
155
+        id: i32,
156
+    ) -> impl Future<Item = (Vec<DatabaseQuad>, CellarEntityStore), Error = (Error, CellarEntityStore)>
157
+    {
158
+        let (mut connection, statements, cache) = self.unwrap();
159
+        let query = connection.query(&statements.select_quad, &[&id]);
216 160
 
217
-        Ok(quads.into_iter().map(|f| self.read_quad(&f)).collect())
161
+        CellarEntityStore::translate_quad_stream(connection, cache, statements, query)
218 162
     }
219 163
 
220
-    fn prestore_quads(&mut self, quads: &Vec<StringQuad>) -> Result<(), diesel::result::Error> {
221
-        let mut required_ids: HashSet<&str> = HashSet::new();
164
+    /// Collects all IDs used inside the passed quads. Can be used to cache all the IDs.
165
+    fn collect_quad_ids(&self, quads: &[DatabaseQuad]) -> HashSet<i32> {
166
+        let mut out = HashSet::new();
222 167
 
223 168
         for quad in quads {
224
-            required_ids.insert(&quad.subject_id);
225
-            required_ids.insert(&quad.predicate_id);
169
+            out.insert(quad.quad_id);
170
+            out.insert(quad.subject_id);
171
+            out.insert(quad.predicate_id);
226 172
             match quad.contents {
227
-                QuadContents::Id(ref data) => required_ids.insert(&*data),
228
-                QuadContents::Object(ref data, _, _) => required_ids.insert(&*data),
173
+                DatabaseQuadContents::Id(id) => out.insert(id),
174
+                DatabaseQuadContents::Object { type_id: id, .. } => out.insert(id),
175
+                _ => false,
229 176
             };
230 177
         }
231 178
 
232
-        self.store_attributes(&(required_ids.into_iter().collect()))
233
-    }
234
-
235
-    fn write_quad(&mut self, quad_id: i32, quad: StringQuad) -> models::InsertableQuad {
236
-        let (vattribute_id, type_id, object, lang) = match quad.contents {
237
-            QuadContents::Id(data) => (Some(self.attribute_id[&data]), None, None, None),
238
-            QuadContents::Object(data, object, lang) => {
239
-                (None, Some(self.attribute_id[&data]), Some(object), lang)
240
-            }
241
-        };
242
-
243
-        models::InsertableQuad {
244
-            quad_id: quad_id,
245
-            subject_id: self.attribute_id[&quad.subject_id],
246
-            predicate_id: self.attribute_id[&quad.predicate_id],
247
-            attribute_id: vattribute_id,
248
-            type_id: type_id,
249
-            object: object,
250
-            language: lang,
251
-        }
252
-    }
253
-
254
-    /// Store a list of quads in the DB, keyed by graph ID.
255
-    pub fn write_quads(
256
-        &mut self,
257
-        quadid: &str,
258
-        items: Vec<StringQuad>,
259
-    ) -> Result<(), diesel::result::Error> {
260
-        self.prestore_quads(&items)?;
261
-
262
-        let quadid = self.get_attribute_id(quadid)?;
263
-
264
-        use schema::quad::dsl::*;
265
-
266
-        let items: Vec<_> = items
267
-            .into_iter()
268
-            .map(|f| self.write_quad(quadid, f))
269
-            .collect();
270
-
271
-        diesel::delete(quad.filter(quad_id.eq(quadid))).execute(&self.connection)?;
272
-        diesel::insert_into(quad)
273
-            .values(&items)
274
-            .execute(&self.connection)?;
275
-
276
-        Ok(())
277
-    }
278
-
279
-    pub fn begin_transaction(&mut self) {
280
-        assert!(self.in_transaction == false);
281
-        self.in_transaction = true;
282
-
283
-        sql_query("begin transaction")
284
-            .execute(&self.connection)
285
-            .unwrap();
286
-    }
287
-
288
-    pub fn commit_transaction(&mut self) {
289
-        assert!(self.in_transaction == true);
290
-        self.in_transaction = false;
291
-
292
-        sql_query("commit").execute(&self.connection).unwrap();
179
+        out
293 180
     }
294 181
 
295
-    pub fn rollback_transaction(&mut self) {
296
-        assert!(self.in_transaction == true);
297
-        self.in_transaction = false;
182
+    /// Translates the incoming quads into quads usable with the jsonld crate.
183
+    pub fn translate_quads(
184
+        self,
185
+        quads: Vec<DatabaseQuad>,
186
+    ) -> impl Future<Item = (Vec<StringQuad>, CellarEntityStore), Error = (Error, CellarEntityStore)>
187
+    {
188
+        let items: Vec<_> = self.collect_quad_ids(&quads).into_iter().collect();
298 189
 
299
-        sql_query("rollback").execute(&self.connection).unwrap();
300
-    }
301
-}
302
-
303
-impl Drop for QuadClient {
304
-    fn drop(&mut self) {
305
-        if self.in_transaction {
306
-            self.rollback_transaction();
307
-        }
190
+        self.cache_ids(&items).map(|store| {
191
+            (
192
+                quads
193
+                    .into_iter()
194
+                    .map(|f| store.cache.translate_quad(f))
195
+                    .collect(),
196
+                store,
197
+            )
198
+        })
308 199
     }
309 200
 }

+ 0
- 70
src/models.rs View File

@@ -1,70 +0,0 @@
1
-use schema::*;
2
-
3
-#[derive(Queryable, Debug)]
4
-pub struct Attribute {
5
-    pub id: i32,
6
-    pub url: String,
7
-}
8
-
9
-#[derive(Insertable, Debug)]
10
-#[table_name = "attribute"]
11
-pub struct NewAttribute<'a> {
12
-    pub url: &'a str,
13
-}
14
-
15
-#[derive(Queryable, Debug)]
16
-pub struct Quad {
17
-    pub id: i32,
18
-    pub quad_id: i32,
19
-    pub subject_id: i32,
20
-    pub predicate_id: i32,
21
-
22
-    pub attribute_id: Option<i32>,
23
-
24
-    pub object: Option<String>,
25
-    pub type_id: Option<i32>,
26
-    pub language: Option<String>,
27
-}
28
-
29
-#[derive(Insertable, Debug)]
30
-#[table_name = "quad"]
31
-pub struct InsertableQuad {
32
-    pub quad_id: i32,
33
-    pub subject_id: i32,
34
-    pub predicate_id: i32,
35
-
36
-    pub attribute_id: Option<i32>,
37
-
38
-    pub object: Option<String>,
39
-    pub type_id: Option<i32>,
40
-    pub language: Option<String>,
41
-}
42
-
43
-#[derive(Queryable, Debug)]
44
-pub struct CollectionItem {
45
-    pub id: i32,
46
-
47
-    pub collection_id: i32,
48
-    pub object_id: i32,
49
-}
50
-
51
-#[derive(Insertable, Debug)]
52
-#[table_name = "collection_item"]
53
-pub struct InsertableCollectionItem {
54
-    pub collection_id: i32,
55
-    pub object_id: i32,
56
-}
57
-
58
-#[derive(Queryable, Debug)]
59
-pub struct QueueItem {
60
-    pub id: i32,
61
-    pub event: String,
62
-    pub data: String,
63
-}
64
-
65
-#[derive(Insertable, Debug)]
66
-#[table_name = "queue_item"]
67
-pub struct InsertableQueueItem {
68
-    pub event: String,
69
-    pub data: String,
70
-}

+ 0
- 37
src/schema.rs View File

@@ -1,37 +0,0 @@
1
-table! {
2
-    attribute (id) {
3
-        id -> Int4,
4
-        url -> Text,
5
-    }
6
-}
7
-
8
-table! {
9
-    collection_item (id) {
10
-        id -> Int4,
11
-        collection_id -> Int4,
12
-        object_id -> Int4,
13
-    }
14
-}
15
-
16
-table! {
17
-    quad (id) {
18
-        id -> Int4,
19
-        quad_id -> Int4,
20
-        subject_id -> Int4,
21
-        predicate_id -> Int4,
22
-        attribute_id -> Nullable<Int4>,
23
-        object -> Nullable<Text>,
24
-        type_id -> Nullable<Int4>,
25
-        language -> Nullable<Bpchar>,
26
-    }
27
-}
28
-
29
-table! {
30
-    queue_item (id) {
31
-        id -> Int4,
32
-        event -> Text,
33
-        data -> Text,
34
-    }
35
-}
36
-
37
-allow_tables_to_appear_in_same_query!(attribute, collection_item, quad,);

+ 55
- 0
src/statements.rs View File

@@ -0,0 +1,55 @@
1
+use futures::{future, stream, Future, Stream};
2
+use tokio_postgres::{error::Error, Client, Statement};
3
+
4
+pub struct Statements {
5
+    pub upsert_attributes: Statement,
6
+    pub select_attributes: Statement,
7
+    pub select_quad: Statement,
8
+    pub insert_quads: Statement,
9
+    pub delete_quads: Statement,
10
+}
11
+
12
+const STATEMENTS: &[&'static str] = &[
13
+    // upsert_attributes
14
+    "insert into attribute (url) select unwrap($1) on conflict do update set url = excluded.url returning id, url",
15
+
16
+    // select_attributes
17
+    "select id, url from attribute where id = any($1)",
18
+
19
+    // select_quad
20
+    "select id, quad_id, subject_id, predicate_id, attribute_id, object, type_id, language from quad where quad_id = $1",
21
+
22
+    // insert_quads
23
+    "insert into quad (quad_id, subject_id, predicate_id, attribute_id, object, type_id, language) select unwrap($1), unwrap($2), unwrap($3), unwrap($4), unwrap($5), unwrap($6), unwrap($7)",
24
+
25
+    // delete_quads
26
+    "delete from quad where quad_id = $1",
27
+];
28
+
29
+impl Statements {
30
+    pub fn make(conn: Client) -> impl Future<Item = (Statements, Client), Error = (Error, Client)> {
31
+        stream::iter_ok(STATEMENTS)
32
+            .fold((conn, vec![]), |(mut conn, mut statements), item| {
33
+                conn.prepare(item).then(move |res| match res {
34
+                    Ok(stmt) => {
35
+                        statements.push(stmt);
36
+                        future::ok((conn, statements))
37
+                    }
38
+
39
+                    Err(e) => future::err((e, conn)),
40
+                })
41
+            })
42
+            .map(|(conn, mut stmts)| {
43
+                (
44
+                    Statements {
45
+                        upsert_attributes: stmts.remove(0),
46
+                        select_attributes: stmts.remove(0),
47
+                        select_quad: stmts.remove(0),
48
+                        insert_quads: stmts.remove(0),
49
+                        delete_quads: stmts.remove(0),
50
+                    },
51
+                    conn,
52
+                )
53
+            })
54
+    }
55
+}

Loading…
Cancel
Save