123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165 |
- import numpy as np
- from ctypes import *
- import matplotlib.pyplot as plt
- import pandas as pd
- from multiprocessing import Pool, Process, freeze_support
- from difflib import SequenceMatcher
- import itertools
- from functools import partial
- #test seq about 500 nucleotides from e-coli genome
- seq = """atcaatgatcaacgtaagcttctaagcatgatcaaggtgctcacacagtttatccacaac
- ctgagtggatgacatcaagataggtcgttgtatctccttcctctcgtactctcatgacca
- cggaaagatgatcaagagaggatgatttcttggccatatcgcaatgaatacttgtgactt
- gtgcttccaattgacatcttcagcgccatattgcgctggccaaggtgacggagcgggatt
- acgaaagcatgatcatggctgttgttctgtttatcttgttttgactgagacttgttagga
- tagacggtttttcatcactgactagccaaagccttactctgcctgacatcgaccgtaaat
- tgataatgaatttacatgcttccgcgacgatttacctcttgatcatcgatccgattgaag
- atcttcaattgttaattctcttgcctcgactcatagccatgatgagctcttgatcatgtt
- tccttaaccctctattttttacggaagaatgatcaagctgctgctcttgatcatcgtttc"""
- #human_dna = pd.read_table("input/dna/human.txt")
- base_pairs = {'G': 'C', 'C': 'G', 'T': 'A', 'A': 'T'}
- #refactor this to return starting locations in genome as well
- def find_kmers(dna, filter_size: int):
- dna_len = len(dna)
- if(filter_size > dna_len): raise Exception("Filter size should not be larger than DNA Length")
- print(f'sequencing dna of length {dna_len}')
- seq_list = []
- kmer_list = dict()
- for i in range(0, (dna_len-filter_size)):
- segment = dna[i:i+filter_size]
- if(seq_list.count(segment) > 0):
- if(kmer_list.get(segment)):
- kmer_list[segment] = kmer_list[segment] + 1
- #once it appears a second time we add it to the kmer list with a count of 2
- else:
- kmer_list[segment] = 2
- #since we havent sequenced this yet we will add it to the list
- else:
- seq_list.append(segment)
-
- #kind of shit until i feel like making a temporary dictionary to put sequences that only appear once
- for key in kmer_list:
- if(kmer_list[key] < 2):
- del kmer_list[key]
-
- return kmer_list
- #find kmers with a variability of max_diff SNPs
- def find_kmers_with_differences(dna, filter_size: int, max_diff: int):
- if(max_diff > filter_size): raise Exception("Max difference should not be larger than filter size")
- dna_len = len(dna)
- if(filter_size > dna_len): raise Exception("Filter size should not be larger than DNA Length")
- print(f'sequencing dna of length {dna_len}')
- leniancy_ratio = float(max_diff/filter_size)
- seq_list = []
- kmer_list = dict()
- for i in range(0, (dna_len-filter_size)):
- segment = dna[i:i+filter_size]
- if(seq_list.count(segment) > 0):
- if(kmer_list.get(segment)):
- kmer_list[segment] = kmer_list[segment] + 1
- #once it appears a second time we add it to the kmer list with a count of 2
- else:
- kmer_list[segment] = 2
- #since we havent sequenced this yet we will add it to the list
- else:
- if(len(seq_list) > 0):
- with Pool(4) as p:
- pOutput = p.map(partial(check_for_discreprency, comparison=segment), seq_list)
- print(pOutput)
- seq_list.append(segment)
-
- #kind of shit until i feel like making a temporary dictionary to put sequences that only appear once
- for key in kmer_list:
- if(kmer_list[key] < 2):
- del kmer_list[key]
-
- return kmer_list
- def check_for_discreprency(segment, comparison):
- print(f'comparing {segment} and {comparison}')
- match = SequenceMatcher(None, segment, comparison).ratio()
- print(match)
- return match
- def find_specific_kmer_with_differences(dna, kmer, max_diff: int):
- dna_len = len(dna)
- if(filter_size > dna_len): raise Exception("Filter size should not be larger than DNA Length")
- print(f'sequencing dna of length {dna_len}')
- seq_list = []
- kmer_list = dict()
- for i in range(0, (dna_len-filter_size)):
- segment = dna[i:i+filter_size]
- if(seq_list.count(segment) > 0):
- if(kmer_list.get(segment)):
- kmer_list[segment] = kmer_list[segment] + 1
- #once it appears a second time we add it to the kmer list with a count of 2
- else:
- kmer_list[segment] = 2
- #since we havent sequenced this yet we will add it to the list
- else:
- seq_list.append(segment)
-
- #kind of shit until i feel like making a temporary dictionary to put sequences that only appear once
- for key in kmer_list:
- if(kmer_list[key] < 2):
- del kmer_list[key]
-
- return kmer_list
-
- def compliment(kmer):
- compliment = ""
- for i in range(0, len(kmer)):
- compliment += base_pairs[kmer]
- return compliment
- def reverse_compliment(kmer):
- #since we bind with the reverse
- reverse = kmer[::-1]
- compliment = ""
- for i in range(0, len(reverse)):
- compliment += base_pairs[reverse[i]]
-
- return compliment
- def skew(genome):
- x = np.arange(0, len(genome), 1)
- skewData = np.empty(shape=len(genome) + 1, dtype=int)
- skewData[0] = 0
- print(len(skewData))
- for i in range(1, len(genome) + 1):
- if(genome[i-1] == 'G'):
- skewData[i] = (skewData[i-1] + 1)
- elif(genome[i-1] == 'C'):
- skewData[i] = (skewData[i-1] - 1)
- else:
- skewData[i] = skewData[i-1]
- return skewData
-
- kmer_dict = dict()
- def main():
- #most DnaA boxes appear in 3-9mers
- kmer_dict = find_kmers_with_differences(seq.upper().replace('\n', ''), 9, 4)
-
- print(kmer_dict)
- skewData = skew(seq.upper())
- plt.plot(skewData)
- plt.xlabel('nucleotide position')
- plt.ylabel('skew')
- plt.show()
- if __name__ == '__main__':
- freeze_support()
- main()
|