1
+ import logging
2
+
3
+
4
+ try :
5
+ from . import hollow_controller as hollow_controller
6
+ except Exception :
7
+ import n0s1 .controllers .hollow_controller as hollow_controller
8
+
9
+
10
+ class GitLabController (hollow_controller .HollowController ):
11
+ def __init__ (self ):
12
+ super ().__init__ ()
13
+ self ._client = None
14
+
15
+ def set_config (self , config = None ):
16
+ super ().set_config (config )
17
+ import gitlab
18
+ TOKEN = config .get ("token" , "" )
19
+ URL = config .get ("url" , "https://gitlab.com" )
20
+ self ._client = gitlab .Gitlab (URL , private_token = TOKEN )
21
+ self ._group = config .get ("group" , "" )
22
+ self ._project = config .get ("project" , "" )
23
+ self ._branch = config .get ("branch" , "" )
24
+ return self .is_connected ()
25
+
26
+ def get_name (self ):
27
+ return "GitLab"
28
+
29
+ def is_connected (self ):
30
+ if self ._client :
31
+ try :
32
+ self ._client .auth ()
33
+ user = self ._client .user
34
+ self .log_message (f"Logged to { self .get_name ()} as { user .username } " )
35
+ return True
36
+ except Exception as e :
37
+ self .log_message (f"Unable to connect to { self .get_name ()} . Check your credentials: { e } " , logging .ERROR )
38
+ return False
39
+ return False
40
+
41
+ def _get_project_obj (self , project_id = None ):
42
+ if self ._project and len (self ._project ) > 0 :
43
+ return self ._client .projects .get (self ._project )
44
+ return self ._client .projects .get (project_id )
45
+
46
+ def _get_projects (self ):
47
+ owner = {}
48
+ projects = []
49
+
50
+ if self ._scan_scope :
51
+ owner = self ._scan_scope .get ("owner" , {})
52
+ for key in self ._scan_scope .get ("projects" , {}):
53
+ projects .append (key )
54
+ if len (projects ) > 0 :
55
+ return projects , owner
56
+
57
+ self .connect ()
58
+ if self ._group and len (self ._group ) > 0 :
59
+ try :
60
+ group = self ._client .groups .get (self ._group )
61
+ projects = group .projects .list (all = True )
62
+ owner = {"type" : "group" , "name" : self ._group }
63
+ except Exception as e :
64
+ message = f"Unable to get projects from group { self ._group } : { e } "
65
+ self .log_message (message , logging .ERROR )
66
+ else :
67
+ projects = self ._client .projects .list (all = True )
68
+ owner = {"type" : "authenticated_user" , "name" : self ._client .user .username }
69
+
70
+ if self ._project and len (self ._project ) > 0 :
71
+ for p in projects :
72
+ if p .path_with_namespace .lower () == self ._project .lower () or str (p .id ) == self ._project :
73
+ return [p ], owner
74
+ try :
75
+ # Try direct access by ID or path
76
+ project = self ._client .projects .get (self ._project )
77
+ return [project ], owner
78
+ except Exception :
79
+ return [], owner
80
+
81
+ return projects , owner
82
+
83
+ def _filter_branches (self , branches , project_id ):
84
+ filtered_branches = branches
85
+ if self ._branch and len (self ._branch ) > 0 :
86
+ filtered_branches = []
87
+ input_branches = self ._branch .split ("," )
88
+ if len (input_branches ) == 1 :
89
+ if input_branches [0 ].lower () == "default" .lower ():
90
+ # Special case for default branch
91
+ self .connect ()
92
+ project_obj = self ._get_project_obj (project_id )
93
+ if project_obj :
94
+ filtered_branches .append (project_obj .default_branch )
95
+ return filtered_branches
96
+ for b in branches :
97
+ branch_name = b .name if hasattr (b , 'name' ) else b
98
+ if branch_name in input_branches and branch_name not in filtered_branches :
99
+ filtered_branches .append (b )
100
+ return filtered_branches
101
+
102
+ def _get_branches (self , project_id , limit = None ):
103
+ branches = []
104
+ if self ._scan_scope :
105
+ project_id_str = str (project_id )
106
+ branches = self ._scan_scope .get ("projects" , {}).get (project_id_str , {}).get ("branches" , {})
107
+ if len (branches ) > 0 :
108
+ return self ._filter_branches (branches , project_id )
109
+
110
+ self .connect ()
111
+ project_obj = self ._get_project_obj (project_id )
112
+ if project_obj :
113
+ branches = project_obj .branches .list (all = True )
114
+ return self ._filter_branches (branches , project_id )
115
+
116
+ def _get_files (self , project_id , branch_name , path = "" , limit = None ):
117
+ files = []
118
+ if self ._scan_scope :
119
+ files = self ._scan_scope .get ("projects" , {}).get (project_id , {}).get ("branches" , {}).get (branch_name , {}).get ("files" , [])
120
+ if len (files ) > 0 :
121
+ return files
122
+
123
+ self .connect ()
124
+ project_obj = self ._get_project_obj (project_id )
125
+ if project_obj :
126
+ try :
127
+ items = project_obj .repository_tree (path = path , ref = branch_name , recursive = True , all = True )
128
+ for item in items :
129
+ if item ['type' ] == 'blob' :
130
+ files .append (item ['path' ])
131
+ except Exception as e :
132
+ message = f"Error listing files from branch { branch_name } : { e } "
133
+ self .log_message (message , logging .ERROR )
134
+ return files
135
+
136
+ def get_mapping (self , levels = - 1 , limit = None ):
137
+ if not self ._client :
138
+ return {}
139
+
140
+ projects , owner = self ._get_projects ()
141
+ map_data = {"owner" : owner , "projects" : {}}
142
+
143
+ if projects :
144
+ for project in projects :
145
+ project_id = project .id if hasattr (project , 'id' ) else project
146
+ project_name = project .path_with_namespace if hasattr (project , 'path_with_namespace' ) else project
147
+
148
+ message = f"Searching in project: { project_name } "
149
+ self .log_message (message , logging .INFO )
150
+
151
+ if project_id :
152
+ p_item = {
153
+ "id" : project_id ,
154
+ "name" : project_name ,
155
+ "branches" : {}
156
+ }
157
+ map_data ["projects" ][project_id ] = p_item
158
+
159
+ if levels > 0 and levels <= 1 :
160
+ continue
161
+
162
+ if branches := self ._get_branches (project_id , limit ):
163
+ for branch in branches :
164
+ branch_name = branch .name if hasattr (branch , 'name' ) else branch
165
+ message = f"Searching in branch: { branch_name } "
166
+ self .log_message (message , logging .INFO )
167
+
168
+ b_item = {
169
+ "name" : branch_name ,
170
+ "files" : {}
171
+ }
172
+
173
+ if branch_name :
174
+ map_data ["projects" ][project_id ]["branches" ][branch_name ] = b_item
175
+
176
+ if levels > 0 and levels <= 2 :
177
+ continue
178
+
179
+ files = self ._get_files (project_id , branch_name )
180
+ map_data ["projects" ][project_id ]["branches" ][branch_name ]["files" ] = files
181
+
182
+ if levels > 0 and levels <= 3 :
183
+ continue
184
+ return map_data
185
+
186
+ def get_data (self , include_comments = False , limit = None ):
187
+ if not self ._client :
188
+ return {}
189
+
190
+ projects = None
191
+ q = self .get_query_from_scope ()
192
+ if q :
193
+ projects = self ._client .projects .list (search = q , all = True )
194
+ if not projects :
195
+ projects , owner = self ._get_projects ()
196
+
197
+ if projects :
198
+ for project in projects :
199
+ # Always get the full project object to ensure all attributes are available
200
+ if hasattr (project , 'id' ):
201
+ project_id = project .id
202
+ # Get the full project object
203
+ try :
204
+ project = self ._client .projects .get (project_id )
205
+ except Exception as e :
206
+ message = f"Error getting full project object for ID { project_id } : { e } "
207
+ self .log_message (message , logging .ERROR )
208
+ continue
209
+ else :
210
+ project_id = project
211
+ try :
212
+ project = self ._get_project_obj (project_id )
213
+ except Exception as e :
214
+ message = f"Error getting project object for ID { project_id } : { e } "
215
+ self .log_message (message , logging .ERROR )
216
+ continue
217
+
218
+ project_name = project .path_with_namespace
219
+ project_url = project .web_url
220
+ project_id = project .id
221
+
222
+ message = f"Searching in project: { project_name } "
223
+ self .log_message (message , logging .INFO )
224
+
225
+ # Iterate through each branch
226
+ for branch in self ._get_branches (project_id ):
227
+ branch_name = branch .name if hasattr (branch , 'name' ) else branch
228
+ message = f"Searching in branch: { branch_name } "
229
+ self .log_message (message , logging .INFO )
230
+
231
+ # Iterate through each file in the branch
232
+ try :
233
+ files = self ._get_files (project_id , branch_name )
234
+ for file_path in files :
235
+ try :
236
+ # Fetch file content
237
+ file_content = project .files .get (file_path = file_path , ref = branch_name )
238
+ raw_content = file_content .decode ()
239
+ # Properly decode bytes to string
240
+ if isinstance (raw_content , bytes ):
241
+ file_data = raw_content .decode ('utf-8' , errors = 'replace' )
242
+ else :
243
+ file_data = raw_content
244
+ url = f"{ project_url } /-/blob/{ branch_name } /{ file_path } "
245
+ file = self .pack_data (file_data , url )
246
+ yield file
247
+ except Exception as e :
248
+ message = f"Error accessing file { file_path } from branch { branch_name } : { e } "
249
+ self .log_message (message , logging .ERROR )
250
+ except Exception as e :
251
+ message = f"Error accessing branch { branch_name } : { e } "
252
+ self .log_message (message , logging .ERROR )
253
+
254
+ def post_comment (self , issue , comment ):
255
+ if not self ._client :
256
+ return False
257
+ message = f"Unable to post comment to { issue } !"
258
+ self .log_message (message , logging .ERROR )
259
+ return False
260
+
261
+ def pack_data (self , file_data , url ):
262
+ ticket_data = {
263
+ "ticket" : {
264
+ "file" : {
265
+ "name" : "file" ,
266
+ "data" : file_data ,
267
+ "data_type" : "str"
268
+ },
269
+ },
270
+ "url" : url ,
271
+ "issue_id" : url
272
+ }
273
+ return ticket_data
0 commit comments