123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315 |
- # -*- coding: utf-8 -*-
- """
- Module for formatting output data into CSV files.
- """
- from __future__ import print_function
- import csv as csvlib
- import os
- import warnings
- from zipfile import ZipFile
- import numpy as np
- from pandas._libs import writers as libwriters
- from pandas.compat import StringIO, range, zip
- from pandas.core.dtypes.generic import (
- ABCDatetimeIndex, ABCIndexClass, ABCMultiIndex, ABCPeriodIndex)
- from pandas.core.dtypes.missing import notna
- from pandas import compat
- from pandas.io.common import (
- UnicodeWriter, _get_handle, _infer_compression, get_filepath_or_buffer)
- class CSVFormatter(object):
- def __init__(self, obj, path_or_buf=None, sep=",", na_rep='',
- float_format=None, cols=None, header=True, index=True,
- index_label=None, mode='w', nanRep=None, encoding=None,
- compression='infer', quoting=None, line_terminator='\n',
- chunksize=None, tupleize_cols=False, quotechar='"',
- date_format=None, doublequote=True, escapechar=None,
- decimal='.'):
- self.obj = obj
- if path_or_buf is None:
- path_or_buf = StringIO()
- self.path_or_buf, _, _, _ = get_filepath_or_buffer(
- path_or_buf, encoding=encoding, compression=compression, mode=mode
- )
- self.sep = sep
- self.na_rep = na_rep
- self.float_format = float_format
- self.decimal = decimal
- self.header = header
- self.index = index
- self.index_label = index_label
- self.mode = mode
- if encoding is None:
- encoding = 'ascii' if compat.PY2 else 'utf-8'
- self.encoding = encoding
- self.compression = _infer_compression(self.path_or_buf, compression)
- if quoting is None:
- quoting = csvlib.QUOTE_MINIMAL
- self.quoting = quoting
- if quoting == csvlib.QUOTE_NONE:
- # prevents crash in _csv
- quotechar = None
- self.quotechar = quotechar
- self.doublequote = doublequote
- self.escapechar = escapechar
- self.line_terminator = line_terminator or os.linesep
- self.date_format = date_format
- self.tupleize_cols = tupleize_cols
- self.has_mi_columns = (isinstance(obj.columns, ABCMultiIndex) and
- not self.tupleize_cols)
- # validate mi options
- if self.has_mi_columns:
- if cols is not None:
- raise TypeError("cannot specify cols with a MultiIndex on the "
- "columns")
- if cols is not None:
- if isinstance(cols, ABCIndexClass):
- cols = cols.to_native_types(na_rep=na_rep,
- float_format=float_format,
- date_format=date_format,
- quoting=self.quoting)
- else:
- cols = list(cols)
- self.obj = self.obj.loc[:, cols]
- # update columns to include possible multiplicity of dupes
- # and make sure sure cols is just a list of labels
- cols = self.obj.columns
- if isinstance(cols, ABCIndexClass):
- cols = cols.to_native_types(na_rep=na_rep,
- float_format=float_format,
- date_format=date_format,
- quoting=self.quoting)
- else:
- cols = list(cols)
- # save it
- self.cols = cols
- # preallocate data 2d list
- self.blocks = self.obj._data.blocks
- ncols = sum(b.shape[0] for b in self.blocks)
- self.data = [None] * ncols
- if chunksize is None:
- chunksize = (100000 // (len(self.cols) or 1)) or 1
- self.chunksize = int(chunksize)
- self.data_index = obj.index
- if (isinstance(self.data_index, (ABCDatetimeIndex, ABCPeriodIndex)) and
- date_format is not None):
- from pandas import Index
- self.data_index = Index([x.strftime(date_format) if notna(x) else
- '' for x in self.data_index])
- self.nlevels = getattr(self.data_index, 'nlevels', 1)
- if not index:
- self.nlevels = 0
- def save(self):
- """
- Create the writer & save
- """
- # GH21227 internal compression is not used when file-like passed.
- if self.compression and hasattr(self.path_or_buf, 'write'):
- msg = ("compression has no effect when passing file-like "
- "object as input.")
- warnings.warn(msg, RuntimeWarning, stacklevel=2)
- # when zip compression is called.
- is_zip = isinstance(self.path_or_buf, ZipFile) or (
- not hasattr(self.path_or_buf, 'write')
- and self.compression == 'zip')
- if is_zip:
- # zipfile doesn't support writing string to archive. uses string
- # buffer to receive csv writing and dump into zip compression
- # file handle. GH21241, GH21118
- f = StringIO()
- close = False
- elif hasattr(self.path_or_buf, 'write'):
- f = self.path_or_buf
- close = False
- else:
- f, handles = _get_handle(self.path_or_buf, self.mode,
- encoding=self.encoding,
- compression=self.compression)
- close = True
- try:
- writer_kwargs = dict(lineterminator=self.line_terminator,
- delimiter=self.sep, quoting=self.quoting,
- doublequote=self.doublequote,
- escapechar=self.escapechar,
- quotechar=self.quotechar)
- if self.encoding == 'ascii':
- self.writer = csvlib.writer(f, **writer_kwargs)
- else:
- writer_kwargs['encoding'] = self.encoding
- self.writer = UnicodeWriter(f, **writer_kwargs)
- self._save()
- finally:
- if is_zip:
- # GH17778 handles zip compression separately.
- buf = f.getvalue()
- if hasattr(self.path_or_buf, 'write'):
- self.path_or_buf.write(buf)
- else:
- f, handles = _get_handle(self.path_or_buf, self.mode,
- encoding=self.encoding,
- compression=self.compression)
- f.write(buf)
- close = True
- if close:
- f.close()
- for _fh in handles:
- _fh.close()
- def _save_header(self):
- writer = self.writer
- obj = self.obj
- index_label = self.index_label
- cols = self.cols
- has_mi_columns = self.has_mi_columns
- header = self.header
- encoded_labels = []
- has_aliases = isinstance(header, (tuple, list, np.ndarray,
- ABCIndexClass))
- if not (has_aliases or self.header):
- return
- if has_aliases:
- if len(header) != len(cols):
- raise ValueError(('Writing {ncols} cols but got {nalias} '
- 'aliases'.format(ncols=len(cols),
- nalias=len(header))))
- else:
- write_cols = header
- else:
- write_cols = cols
- if self.index:
- # should write something for index label
- if index_label is not False:
- if index_label is None:
- if isinstance(obj.index, ABCMultiIndex):
- index_label = []
- for i, name in enumerate(obj.index.names):
- if name is None:
- name = ''
- index_label.append(name)
- else:
- index_label = obj.index.name
- if index_label is None:
- index_label = ['']
- else:
- index_label = [index_label]
- elif not isinstance(index_label,
- (list, tuple, np.ndarray, ABCIndexClass)):
- # given a string for a DF with Index
- index_label = [index_label]
- encoded_labels = list(index_label)
- else:
- encoded_labels = []
- if not has_mi_columns or has_aliases:
- encoded_labels += list(write_cols)
- writer.writerow(encoded_labels)
- else:
- # write out the mi
- columns = obj.columns
- # write out the names for each level, then ALL of the values for
- # each level
- for i in range(columns.nlevels):
- # we need at least 1 index column to write our col names
- col_line = []
- if self.index:
- # name is the first column
- col_line.append(columns.names[i])
- if isinstance(index_label, list) and len(index_label) > 1:
- col_line.extend([''] * (len(index_label) - 1))
- col_line.extend(columns._get_level_values(i))
- writer.writerow(col_line)
- # Write out the index line if it's not empty.
- # Otherwise, we will print out an extraneous
- # blank line between the mi and the data rows.
- if encoded_labels and set(encoded_labels) != {''}:
- encoded_labels.extend([''] * len(columns))
- writer.writerow(encoded_labels)
- def _save(self):
- self._save_header()
- nrows = len(self.data_index)
- # write in chunksize bites
- chunksize = self.chunksize
- chunks = int(nrows / chunksize) + 1
- for i in range(chunks):
- start_i = i * chunksize
- end_i = min((i + 1) * chunksize, nrows)
- if start_i >= end_i:
- break
- self._save_chunk(start_i, end_i)
- def _save_chunk(self, start_i, end_i):
- data_index = self.data_index
- # create the data for a chunk
- slicer = slice(start_i, end_i)
- for i in range(len(self.blocks)):
- b = self.blocks[i]
- d = b.to_native_types(slicer=slicer, na_rep=self.na_rep,
- float_format=self.float_format,
- decimal=self.decimal,
- date_format=self.date_format,
- quoting=self.quoting)
- for col_loc, col in zip(b.mgr_locs, d):
- # self.data is a preallocated list
- self.data[col_loc] = col
- ix = data_index.to_native_types(slicer=slicer, na_rep=self.na_rep,
- float_format=self.float_format,
- decimal=self.decimal,
- date_format=self.date_format,
- quoting=self.quoting)
- libwriters.write_csv_rows(self.data, ix, self.nlevels,
- self.cols, self.writer)
|