-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathcdatabaselet.py
More file actions
103 lines (83 loc) · 3.03 KB
/
cdatabaselet.py
File metadata and controls
103 lines (83 loc) · 3.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import logging
logger = logging.getLogger(__name__)
import array
import struct
class CDatabaselet:
def __init__(self, cdb_file, hsize, gtotal, dcount, gcount, nibble=True):
self.header_size = hsize
self.dataset_count = dcount
self.gene_count = gcount
self.cdb_file = cdb_file
self.gene_total = gtotal
self.nibble = nibble
self.genes = []
self.genes_idx = {}
def append_gene(self,gene):
self.genes_idx[gene] = len(self.genes)
self.genes.append(gene)
def __repr__(self):
return ':'.join([self.cdb_file, str(self.header_size),
str(self.dataset_count), str(self.gene_count)])
"""
Return the number of bytes for storing all datasets for a gene pair
"""
def get_dataset_size(self):
if self.nibble:
return (self.dataset_count+1)/2
else:
return self.dataset_count
"""
Get offset in the databaselet for gene
"""
def get_gene_offset(self, gene):
# header size + gene offset * # of datasets
return self.header_size + self.genes_idx[gene] * self.get_dataset_size() * self.gene_total
"""
Return a list of all pairwise dataset values for g1 and g2
"""
def get_genepair_values(self, g1, g2idx):
db_file = open(self.cdb_file, 'rb')
seek = self.get_gene_offset(g1) + g2idx * self.get_dataset_size()
db_file.seek(int(seek))
byte_list = array.array('B')
if self.nibble:
byte_list.fromfile(db_file, self.dataset_count/2)
else:
byte_list.fromfile(db_file, self.dataset_count)
if self.nibble:
values = [None]*(len(byte_list)*2)
for (i,b) in enumerate(byte_list):
values[i*2] = (b & 0x0F)
values[i*2+1] = (b >> 4)
# check if on byte interval
if self.nibble and int(seek) >= seek and self.dataset_count % 2 == 1:
b = struct.unpack('B',db_file.read(1))
values.append(b[0] & 0x0F)
else:
values = byte_list
return values
"""
Return a list of all pairwise values for g1 to all other genes across all other datasets.
"""
def get_gene_values(self, g1):
db_file = open(self.cdb_file, 'rb')
seek = self.get_gene_offset(g1)
db_file.seek(int(seek))
total_vals = self.dataset_count * self.gene_total
byte_list = array.array('B')
if self.nibble:
byte_list.fromfile(db_file, total_vals / 2)
else:
byte_list.fromfile(db_file, total_vals)
if self.nibble:
values = [None]*(len(byte_list)*2)
for (i,b) in enumerate(byte_list):
idx = i*2
values[idx] = ( b & 0x0F )
values[idx+1] = ( b >> 4 )
if total_vals % 2 == 1:
b = struct.unpack('B', db_file.read(1))
values.append(b[0] & 0x0F)
else:
values = byte_list
return values